[ARVADOS] created: 2.1.0-1488-gbda9093be

Git user git at public.arvados.org
Fri Oct 8 15:21:38 UTC 2021


        at  bda9093be4d24d45a6fff29148fbb5438e283897 (commit)


commit bda9093be4d24d45a6fff29148fbb5438e283897
Author: Tom Clegg <tom at curii.com>
Date:   Fri Oct 8 11:04:53 2021 -0400

    16347: Add LocalKeepLogsToContainerLog config (none/errors/all).
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml
index 429b9fe48..106ecdfac 100644
--- a/lib/config/config.default.yml
+++ b/lib/config/config.default.yml
@@ -929,6 +929,21 @@ Clusters:
       # in memory for the duration of the container.
       LocalKeepBlobBuffersPerVCPU: 0
 
+      # When running a dedicated keepstore process for a container
+      # (see LocalKeepBlobBuffersPerVCPU), write keepstore log
+      # messages to keepstore.txt in the container's log collection.
+      #
+      # These log messages can reveal some volume configuration
+      # details, error messages from the cloud storage provider, etc.,
+      # which are not otherwise visible to users.
+      #
+      # Accepted values:
+      # * "none" -- no keepstore.txt file
+      # * "all" -- all logs, including request and response lines
+      # * "errors" -- all logs except "response" logs with 2xx
+      #   response codes and "request" logs
+      LocalKeepLogsToContainerLog: none
+
       Logging:
         # When you run the db:delete_old_container_logs task, it will find
         # containers that have been finished for at least this many seconds,
diff --git a/lib/config/export.go b/lib/config/export.go
index f400dabbf..e36d6e76c 100644
--- a/lib/config/export.go
+++ b/lib/config/export.go
@@ -120,6 +120,7 @@ var whitelist = map[string]bool{
 	"Containers.JobsAPI.Enable":                           true,
 	"Containers.JobsAPI.GitInternalDir":                   false,
 	"Containers.LocalKeepBlobBuffersPerVCPU":              false,
+	"Containers.LocalKeepLogsToContainerLog":              false,
 	"Containers.Logging":                                  false,
 	"Containers.LogReuseDecisions":                        false,
 	"Containers.LSF":                                      false,
diff --git a/lib/config/generated_config.go b/lib/config/generated_config.go
index dfa406bbe..4207e6e4d 100644
--- a/lib/config/generated_config.go
+++ b/lib/config/generated_config.go
@@ -935,6 +935,21 @@ Clusters:
       # in memory for the duration of the container.
       LocalKeepBlobBuffersPerVCPU: 0
 
+      # When running a dedicated keepstore process for a container
+      # (see LocalKeepBlobBuffersPerVCPU), write keepstore log
+      # messages to keepstore.txt in the container's log collection.
+      #
+      # These log messages can reveal some volume configuration
+      # details, error messages from the cloud storage provider, etc.,
+      # which are not otherwise visible to users.
+      #
+      # Accepted values:
+      # * "none" -- no keepstore.txt file
+      # * "all" -- all logs, including request and response lines
+      # * "errors" -- all logs except "response" logs with 2xx
+      #   response codes and "request" logs
+      LocalKeepLogsToContainerLog: none
+
       Logging:
         # When you run the db:delete_old_container_logs task, it will find
         # containers that have been finished for at least this many seconds,
diff --git a/lib/config/load.go b/lib/config/load.go
index b6375c820..a7419331f 100644
--- a/lib/config/load.go
+++ b/lib/config/load.go
@@ -295,6 +295,7 @@ func (ldr *Loader) Load() (*arvados.Config, error) {
 			ldr.checkToken(fmt.Sprintf("Clusters.%s.SystemRootToken", id), cc.SystemRootToken),
 			ldr.checkToken(fmt.Sprintf("Clusters.%s.Collections.BlobSigningKey", id), cc.Collections.BlobSigningKey),
 			checkKeyConflict(fmt.Sprintf("Clusters.%s.PostgreSQL.Connection", id), cc.PostgreSQL.Connection),
+			ldr.checkEnum("Containers.LocalKeepLogsToContainerLog", cc.Containers.LocalKeepLogsToContainerLog, "none", "all", "errors"),
 			ldr.checkEmptyKeepstores(cc),
 			ldr.checkUnlistedKeepstores(cc),
 			ldr.checkLocalKeepstoreVolumes(cc),
@@ -339,6 +340,15 @@ func (ldr *Loader) checkToken(label, token string) error {
 	return nil
 }
 
+func (ldr *Loader) checkEnum(label, value string, accepted ...string) error {
+	for _, s := range accepted {
+		if s == value {
+			return nil
+		}
+	}
+	return fmt.Errorf("%s: unacceptable value %q: must be one of %q", label, value, accepted)
+}
+
 func (ldr *Loader) setImplicitStorageClasses(cfg *arvados.Config) error {
 cluster:
 	for id, cc := range cfg.Clusters {
diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go
index ce060151d..c9456ccc7 100644
--- a/lib/crunchrun/crunchrun.go
+++ b/lib/crunchrun/crunchrun.go
@@ -1775,14 +1775,29 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
 		return 1
 	}
 
-	if keepstore != nil {
-		w, err := cr.NewLogWriter("keepstore")
+	if keepstore == nil {
+		// Nothing is written to keepstoreLogbuf, no need to
+		// call SetWriter.
+	} else if logWhat := conf.Cluster.Containers.LocalKeepLogsToContainerLog; logWhat == "none" {
+		keepstoreLogbuf.SetWriter(io.Discard)
+	} else {
+		logwriter, err := cr.NewLogWriter("keepstore")
 		if err != nil {
 			log.Print(err)
 			return 1
 		}
-		cr.keepstoreLogger = NewThrottledLogger(w)
-		err = keepstoreLogbuf.SetWriter(cr.keepstoreLogger)
+		cr.keepstoreLogger = NewThrottledLogger(logwriter)
+
+		var writer io.WriteCloser = cr.keepstoreLogger
+		if logWhat == "errors" {
+			writer = &filterKeepstoreErrorsOnly{WriteCloser: writer}
+		} else if logWhat != "all" {
+			// should have been caught earlier by
+			// dispatcher's config loader
+			log.Printf("invalid value for Containers.LocalKeepLogsToContainerLog: %q", logWhat)
+			return 1
+		}
+		err = keepstoreLogbuf.SetWriter(writer)
 		if err != nil {
 			log.Print(err)
 			return 1
diff --git a/lib/crunchrun/integration_test.go b/lib/crunchrun/integration_test.go
index 597490c57..8adddd705 100644
--- a/lib/crunchrun/integration_test.go
+++ b/lib/crunchrun/integration_test.go
@@ -87,9 +87,6 @@ func (s *integrationSuite) SetUpSuite(c *C) {
 	})
 	c.Assert(err, IsNil)
 	c.Logf("input pdh %s", s.input.PortableDataHash)
-
-	s.logCollection = arvados.Collection{}
-	s.outputCollection = arvados.Collection{}
 }
 
 func (s *integrationSuite) TearDownSuite(c *C) {
@@ -106,6 +103,8 @@ func (s *integrationSuite) SetUpTest(c *C) {
 	s.stdin = bytes.Buffer{}
 	s.stdout = bytes.Buffer{}
 	s.stderr = bytes.Buffer{}
+	s.logCollection = arvados.Collection{}
+	s.outputCollection = arvados.Collection{}
 	s.cr = arvados.ContainerRequest{
 		Priority:       1,
 		State:          "Committed",
@@ -165,35 +164,55 @@ func (s *integrationSuite) TestRunTrivialContainerWithSingularity(c *C) {
 }
 
 func (s *integrationSuite) TestRunTrivialContainerWithLocalKeepstore(c *C) {
-	cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
-	c.Assert(err, IsNil)
-	cluster, err := cfg.GetCluster("")
-	c.Assert(err, IsNil)
-	for uuid, volume := range cluster.Volumes {
-		volume.AccessViaHosts = nil
-		volume.Replication = 2
-		cluster.Volumes[uuid] = volume
-	}
+	for _, trial := range []struct {
+		logConfig           string
+		matchGetReq         Checker
+		matchPutReq         Checker
+		matchStartupMessage Checker
+	}{
+		{"none", Not(Matches), Not(Matches), Not(Matches)},
+		{"all", Matches, Matches, Matches},
+		{"errors", Not(Matches), Not(Matches), Matches},
+	} {
+		c.Logf("=== testing with Containers.LocalKeepLogsToContainerLog: %q", trial.logConfig)
+		s.SetUpTest(c)
 
-	s.stdin.Reset()
-	err = json.NewEncoder(&s.stdin).Encode(ConfigData{
-		Env:         nil,
-		KeepBuffers: 1,
-		Cluster:     cluster,
-	})
-	c.Assert(err, IsNil)
+		cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
+		c.Assert(err, IsNil)
+		cluster, err := cfg.GetCluster("")
+		c.Assert(err, IsNil)
+		for uuid, volume := range cluster.Volumes {
+			volume.AccessViaHosts = nil
+			volume.Replication = 2
+			cluster.Volumes[uuid] = volume
+		}
+		cluster.Containers.LocalKeepLogsToContainerLog = trial.logConfig
 
-	s.engine = "docker"
-	s.testRunTrivialContainer(c)
+		s.stdin.Reset()
+		err = json.NewEncoder(&s.stdin).Encode(ConfigData{
+			Env:         nil,
+			KeepBuffers: 1,
+			Cluster:     cluster,
+		})
+		c.Assert(err, IsNil)
 
-	fs, err := s.logCollection.FileSystem(s.client, s.kc)
-	c.Assert(err, IsNil)
-	f, err := fs.Open("keepstore.txt")
-	c.Assert(err, IsNil)
-	buf, err := ioutil.ReadAll(f)
-	c.Assert(err, IsNil)
-	c.Check(string(buf), Matches, `(?ms).*"reqMethod":"GET".*`)
-	c.Check(string(buf), Matches, `(?ms).*"reqMethod":"PUT".*,"reqPath":"0e3bcff26d51c895a60ea0d4585e134d".*`)
+		s.engine = "docker"
+		s.testRunTrivialContainer(c)
+
+		fs, err := s.logCollection.FileSystem(s.client, s.kc)
+		c.Assert(err, IsNil)
+		f, err := fs.Open("keepstore.txt")
+		if trial.logConfig == "none" {
+			c.Check(err, NotNil)
+			c.Check(os.IsNotExist(err), Equals, true)
+		} else {
+			c.Assert(err, IsNil)
+			buf, err := ioutil.ReadAll(f)
+			c.Assert(err, IsNil)
+			c.Check(string(buf), trial.matchGetReq, `(?ms).*"reqMethod":"GET".*`)
+			c.Check(string(buf), trial.matchPutReq, `(?ms).*"reqMethod":"PUT".*,"reqPath":"0e3bcff26d51c895a60ea0d4585e134d".*`)
+		}
+	}
 }
 
 func (s *integrationSuite) testRunTrivialContainer(c *C) {
diff --git a/lib/crunchrun/logging.go b/lib/crunchrun/logging.go
index 050894383..76a55c499 100644
--- a/lib/crunchrun/logging.go
+++ b/lib/crunchrun/logging.go
@@ -7,6 +7,7 @@ package crunchrun
 import (
 	"bufio"
 	"bytes"
+	"encoding/json"
 	"fmt"
 	"io"
 	"log"
@@ -404,3 +405,53 @@ func loadLogThrottleParams(clnt IArvadosClient) {
 	loadDuration(&crunchLogUpdatePeriod, "crunchLogUpdatePeriod")
 
 }
+
+type filterKeepstoreErrorsOnly struct {
+	io.WriteCloser
+	buf []byte
+}
+
+func (f *filterKeepstoreErrorsOnly) Write(p []byte) (int, error) {
+	log.Printf("filterKeepstoreErrorsOnly: write %q", p)
+	f.buf = append(f.buf, p...)
+	start := 0
+	for i := len(f.buf) - len(p); i < len(f.buf); i++ {
+		if f.buf[i] == '\n' {
+			if f.check(f.buf[start:i]) {
+				_, err := f.WriteCloser.Write(f.buf[start : i+1])
+				if err != nil {
+					return 0, err
+				}
+			}
+			start = i + 1
+		}
+	}
+	if start > 0 {
+		copy(f.buf, f.buf[start:])
+		f.buf = f.buf[:len(f.buf)-start]
+	}
+	return len(p), nil
+}
+
+func (f *filterKeepstoreErrorsOnly) check(line []byte) bool {
+	if len(line) == 0 {
+		return false
+	}
+	if line[0] != '{' {
+		return true
+	}
+	var m map[string]interface{}
+	err := json.Unmarshal(line, &m)
+	if err != nil {
+		return true
+	}
+	if m["msg"] == "request" {
+		return false
+	}
+	if m["msg"] == "response" {
+		if code, _ := m["respStatusCode"].(float64); code >= 200 && code < 300 {
+			return false
+		}
+	}
+	return true
+}
diff --git a/lib/crunchrun/logging_test.go b/lib/crunchrun/logging_test.go
index 55460af37..fdd4f27b7 100644
--- a/lib/crunchrun/logging_test.go
+++ b/lib/crunchrun/logging_test.go
@@ -5,7 +5,9 @@
 package crunchrun
 
 import (
+	"bytes"
 	"fmt"
+	"io"
 	"strings"
 	"testing"
 	"time"
@@ -13,6 +15,7 @@ import (
 	"git.arvados.org/arvados.git/sdk/go/arvados"
 	"git.arvados.org/arvados.git/sdk/go/arvadosclient"
 	. "gopkg.in/check.v1"
+	check "gopkg.in/check.v1"
 )
 
 type LoggingTestSuite struct {
@@ -219,3 +222,34 @@ func (s *LoggingTestSuite) testWriteLogsWithRateLimit(c *C, throttleParam string
 	c.Check(true, Equals, strings.Contains(stderrLog, expected))
 	c.Check(string(kc.Content), Equals, logtext)
 }
+
+type filterSuite struct{}
+
+var _ = Suite(&filterSuite{})
+
+func (*filterSuite) TestFilterKeepstoreErrorsOnly(c *check.C) {
+	var buf bytes.Buffer
+	f := filterKeepstoreErrorsOnly{WriteCloser: nopCloser{&buf}}
+	for _, s := range []string{
+		"not j",
+		"son\n" + `{"msg":"foo"}` + "\n{}\n" + `{"msg":"request"}` + "\n" + `{"msg":1234}` + "\n\n",
+		"\n[\n",
+		`{"msg":"response","respStatusCode":404,"foo": "bar"}` + "\n",
+		`{"msg":"response","respStatusCode":206}` + "\n",
+	} {
+		f.Write([]byte(s))
+	}
+	c.Check(buf.String(), check.Equals, `not json
+{"msg":"foo"}
+{}
+{"msg":1234}
+[
+{"msg":"response","respStatusCode":404,"foo": "bar"}
+`)
+}
+
+type nopCloser struct {
+	io.Writer
+}
+
+func (nopCloser) Close() error { return nil }
diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go
index b84e1eefa..e736f79fd 100644
--- a/sdk/go/arvados/config.go
+++ b/sdk/go/arvados/config.go
@@ -436,6 +436,7 @@ type ContainersConfig struct {
 	UsePreemptibleInstances     bool
 	RuntimeEngine               string
 	LocalKeepBlobBuffersPerVCPU int
+	LocalKeepLogsToContainerLog string
 
 	JobsAPI struct {
 		Enable         string

commit 6b39f96311c832de21bcacc3f17a611682d522a9
Author: Tom Clegg <tom at curii.com>
Date:   Thu Oct 7 16:22:54 2021 -0400

    16347: Add test case. Flush keepstore logs in CommitLogs.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go
index d70dd1c42..ce060151d 100644
--- a/lib/crunchrun/crunchrun.go
+++ b/lib/crunchrun/crunchrun.go
@@ -137,6 +137,8 @@ type ContainerRunner struct {
 	finalState    string
 	parentTemp    string
 
+	keepstoreLogger  io.WriteCloser
+	keepstoreLogbuf  *bufThenWrite
 	statLogger       io.WriteCloser
 	statReporter     *crunchstat.Reporter
 	hoststatLogger   io.WriteCloser
@@ -1277,6 +1279,16 @@ func (runner *ContainerRunner) CommitLogs() error {
 		runner.CrunchLog.Immediate = log.New(os.Stderr, runner.Container.UUID+" ", 0)
 	}()
 
+	if runner.keepstoreLogger != nil {
+		// Flush any buffered logs from our local keepstore
+		// process.  Discard anything logged after this point
+		// -- it won't end up in the log collection, so
+		// there's no point writing it to the collectionfs.
+		runner.keepstoreLogbuf.SetWriter(io.Discard)
+		runner.keepstoreLogger.Close()
+		runner.keepstoreLogger = nil
+	}
+
 	if runner.LogsPDH != nil {
 		// If we have already assigned something to LogsPDH,
 		// we must be closing the re-opened log, which won't
@@ -1285,6 +1297,7 @@ func (runner *ContainerRunner) CommitLogs() error {
 		// -- it exists only to send logs to other channels.
 		return nil
 	}
+
 	saved, err := runner.saveLogCollection(true)
 	if err != nil {
 		return fmt.Errorf("error saving log collection: %s", err)
@@ -1647,6 +1660,7 @@ func NewContainerRunner(dispatcherClient *arvados.Client,
 }
 
 func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
+	log := log.New(stderr, "", 0)
 	flags := flag.NewFlagSet(prog, flag.ContinueOnError)
 	statInterval := flags.Duration("crunchstat-interval", 10*time.Second, "sampling period for periodic resource usage reporting")
 	cgroupRoot := flags.String("cgroup-root", "/sys/fs/cgroup", "path to sysfs cgroup tree")
@@ -1702,9 +1716,9 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
 
 	var conf ConfigData
 	if *stdinConfig {
-		err := json.NewDecoder(os.Stdin).Decode(&conf)
+		err := json.NewDecoder(stdin).Decode(&conf)
 		if err != nil {
-			log.Print(err)
+			log.Printf("decode stdin: %s", err)
 			return 1
 		}
 		for k, v := range conf.Env {
@@ -1730,8 +1744,8 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
 		arvadosclient.CertFiles = []string{*caCertsPath}
 	}
 
-	var keepstoreLog bufThenWrite
-	keepstore, err := startLocalKeepstore(conf, io.MultiWriter(&keepstoreLog, stderr))
+	var keepstoreLogbuf bufThenWrite
+	keepstore, err := startLocalKeepstore(conf, io.MultiWriter(&keepstoreLogbuf, stderr))
 	if err != nil {
 		log.Print(err)
 		return 1
@@ -1747,9 +1761,9 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
 	}
 	api.Retries = 8
 
-	kc, kcerr := keepclient.MakeKeepClient(api)
-	if kcerr != nil {
-		log.Printf("%s: %v", containerUUID, kcerr)
+	kc, err := keepclient.MakeKeepClient(api)
+	if err != nil {
+		log.Printf("%s: %v", containerUUID, err)
 		return 1
 	}
 	kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2}
@@ -1767,11 +1781,13 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
 			log.Print(err)
 			return 1
 		}
-		err = keepstoreLog.SetWriter(NewThrottledLogger(w))
+		cr.keepstoreLogger = NewThrottledLogger(w)
+		err = keepstoreLogbuf.SetWriter(cr.keepstoreLogger)
 		if err != nil {
 			log.Print(err)
 			return 1
 		}
+		cr.keepstoreLogbuf = &keepstoreLogbuf
 	}
 
 	switch *runtimeEngine {
@@ -1895,13 +1911,20 @@ func startLocalKeepstore(configData ConfigData, logbuf io.Writer) (*exec.Cmd, er
 		return nil, err
 	}
 	cmd := exec.Command("/proc/self/exe", "keepstore", "-config=-")
+	if target, err := os.Readlink(cmd.Path); err == nil && strings.HasSuffix(target, ".test") {
+		// If we're a 'go test' process, running
+		// /proc/self/exe would start the test suite in a
+		// child process, which is not what we want.
+		cmd.Path, _ = exec.LookPath("go")
+		cmd.Args = append([]string{"go", "run", "../../cmd/arvados-server"}, cmd.Args[1:]...)
+		cmd.Env = os.Environ()
+	}
 	cmd.Stdin = &confJSON
 	cmd.Stdout = logbuf
 	cmd.Stderr = logbuf
-	cmd.Env = []string{
+	cmd.Env = append(cmd.Env,
 		"GOGC=10",
-		"ARVADOS_SERVICE_INTERNAL_URL=" + url,
-	}
+		"ARVADOS_SERVICE_INTERNAL_URL="+url)
 	err = cmd.Start()
 	if err != nil {
 		return nil, fmt.Errorf("error starting keepstore process: %w", err)
diff --git a/lib/crunchrun/integration_test.go b/lib/crunchrun/integration_test.go
index c688248c6..597490c57 100644
--- a/lib/crunchrun/integration_test.go
+++ b/lib/crunchrun/integration_test.go
@@ -6,6 +6,7 @@ package crunchrun
 
 import (
 	"bytes"
+	"encoding/json"
 	"fmt"
 	"io"
 	"io/ioutil"
@@ -13,9 +14,11 @@ import (
 	"os/exec"
 	"strings"
 
+	"git.arvados.org/arvados.git/lib/config"
 	"git.arvados.org/arvados.git/sdk/go/arvados"
 	"git.arvados.org/arvados.git/sdk/go/arvadosclient"
 	"git.arvados.org/arvados.git/sdk/go/arvadostest"
+	"git.arvados.org/arvados.git/sdk/go/ctxlog"
 	"git.arvados.org/arvados.git/sdk/go/keepclient"
 	. "gopkg.in/check.v1"
 )
@@ -33,6 +36,9 @@ type integrationSuite struct {
 	client *arvados.Client
 	ac     *arvadosclient.ArvadosClient
 	kc     *keepclient.KeepClient
+
+	logCollection    arvados.Collection
+	outputCollection arvados.Collection
 }
 
 func (s *integrationSuite) SetUpSuite(c *C) {
@@ -49,7 +55,12 @@ func (s *integrationSuite) SetUpSuite(c *C) {
 	out, err = exec.Command("arv-keepdocker", "--no-resume", "busybox:uclibc").Output()
 	imageUUID := strings.TrimSpace(string(out))
 	c.Logf("image uuid %s", imageUUID)
-	c.Assert(err, IsNil)
+	if !c.Check(err, IsNil) {
+		if err, ok := err.(*exec.ExitError); ok {
+			c.Logf("%s", err.Stderr)
+		}
+		c.Fail()
+	}
 	err = arvados.NewClientFromEnv().RequestAndDecode(&s.image, "GET", "arvados/v1/collections/"+imageUUID, nil, nil)
 	c.Assert(err, IsNil)
 	c.Logf("image pdh %s", s.image.PortableDataHash)
@@ -76,6 +87,9 @@ func (s *integrationSuite) SetUpSuite(c *C) {
 	})
 	c.Assert(err, IsNil)
 	c.Logf("input pdh %s", s.input.PortableDataHash)
+
+	s.logCollection = arvados.Collection{}
+	s.outputCollection = arvados.Collection{}
 }
 
 func (s *integrationSuite) TearDownSuite(c *C) {
@@ -150,17 +164,56 @@ func (s *integrationSuite) TestRunTrivialContainerWithSingularity(c *C) {
 	s.testRunTrivialContainer(c)
 }
 
+func (s *integrationSuite) TestRunTrivialContainerWithLocalKeepstore(c *C) {
+	cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
+	c.Assert(err, IsNil)
+	cluster, err := cfg.GetCluster("")
+	c.Assert(err, IsNil)
+	for uuid, volume := range cluster.Volumes {
+		volume.AccessViaHosts = nil
+		volume.Replication = 2
+		cluster.Volumes[uuid] = volume
+	}
+
+	s.stdin.Reset()
+	err = json.NewEncoder(&s.stdin).Encode(ConfigData{
+		Env:         nil,
+		KeepBuffers: 1,
+		Cluster:     cluster,
+	})
+	c.Assert(err, IsNil)
+
+	s.engine = "docker"
+	s.testRunTrivialContainer(c)
+
+	fs, err := s.logCollection.FileSystem(s.client, s.kc)
+	c.Assert(err, IsNil)
+	f, err := fs.Open("keepstore.txt")
+	c.Assert(err, IsNil)
+	buf, err := ioutil.ReadAll(f)
+	c.Assert(err, IsNil)
+	c.Check(string(buf), Matches, `(?ms).*"reqMethod":"GET".*`)
+	c.Check(string(buf), Matches, `(?ms).*"reqMethod":"PUT".*,"reqPath":"0e3bcff26d51c895a60ea0d4585e134d".*`)
+}
+
 func (s *integrationSuite) testRunTrivialContainer(c *C) {
 	if err := exec.Command("which", s.engine).Run(); err != nil {
 		c.Skip(fmt.Sprintf("%s: %s", s.engine, err))
 	}
 	s.cr.Command = []string{"sh", "-c", "cat /mnt/in/inputfile >/mnt/out/inputfile && cat /mnt/json >/mnt/out/json && ! touch /mnt/in/shouldbereadonly && mkdir /mnt/out/emptydir"}
 	s.setup(c)
-	code := command{}.RunCommand("crunch-run", []string{
+
+	args := []string{
 		"-runtime-engine=" + s.engine,
 		"-enable-memory-limit=false",
 		s.cr.ContainerUUID,
-	}, &s.stdin, io.MultiWriter(&s.stdout, os.Stderr), io.MultiWriter(&s.stderr, os.Stderr))
+	}
+	if s.stdin.Len() > 0 {
+		args = append([]string{"-stdin-config=true"}, args...)
+	}
+	code := command{}.RunCommand("crunch-run", args, &s.stdin, io.MultiWriter(&s.stdout, os.Stderr), io.MultiWriter(&s.stderr, os.Stderr))
+	c.Logf("\n===== stdout =====\n%s", s.stdout.String())
+	c.Logf("\n===== stderr =====\n%s", s.stderr.String())
 	c.Check(code, Equals, 0)
 	err := s.client.RequestAndDecode(&s.cr, "GET", "arvados/v1/container_requests/"+s.cr.UUID, nil, nil)
 	c.Assert(err, IsNil)
@@ -185,6 +238,7 @@ func (s *integrationSuite) testRunTrivialContainer(c *C) {
 			c.Logf("\n===== %s =====\n%s", fi.Name(), buf)
 		}
 	}
+	s.logCollection = log
 
 	var output arvados.Collection
 	err = s.client.RequestAndDecode(&output, "GET", "arvados/v1/collections/"+s.cr.OutputUUID, nil, nil)
@@ -218,4 +272,5 @@ func (s *integrationSuite) testRunTrivialContainer(c *C) {
 			c.Check(fi.Name(), Equals, ".keep")
 		}
 	}
+	s.outputCollection = output
 }

commit 5f95bfc2c5c7706c7961aeca3aabd90ea5661f0a
Author: Tom Clegg <tom at curii.com>
Date:   Thu Oct 7 16:21:48 2021 -0400

    16347: Use health check endpoint to check keepstore readiness.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go
index 7a2afeacc..d70dd1c42 100644
--- a/lib/crunchrun/crunchrun.go
+++ b/lib/crunchrun/crunchrun.go
@@ -1906,26 +1906,34 @@ func startLocalKeepstore(configData ConfigData, logbuf io.Writer) (*exec.Cmd, er
 	if err != nil {
 		return nil, fmt.Errorf("error starting keepstore process: %w", err)
 	}
+	cmdExited := false
+	go func() {
+		cmd.Wait()
+		cmdExited = true
+	}()
 	ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*10))
 	defer cancel()
 	poll := time.NewTicker(time.Second / 10)
 	defer poll.Stop()
 	client := http.Client{}
 	for range poll.C {
-		testReq, err := http.NewRequestWithContext(ctx, "GET", url, nil)
+		testReq, err := http.NewRequestWithContext(ctx, "GET", url+"/_health/ping", nil)
+		testReq.Header.Set("Authorization", "Bearer "+configData.Cluster.ManagementToken)
 		if err != nil {
 			return nil, err
 		}
 		resp, err := client.Do(testReq)
 		if err == nil {
-			// Success -- don't need to check the
-			// response, we just need to know it's
-			// accepting requests.
 			resp.Body.Close()
-			break
+			if resp.StatusCode == http.StatusOK {
+				break
+			}
+		}
+		if cmdExited {
+			return nil, fmt.Errorf("keepstore child process exited")
 		}
 		if ctx.Err() != nil {
-			return nil, fmt.Errorf("timed out waiting for new keepstore process to accept a request")
+			return nil, fmt.Errorf("timed out waiting for new keepstore process to report healthy")
 		}
 	}
 	os.Setenv("ARVADOS_KEEP_SERVICES", url)

commit e04586707b923f857c61a17ad38a9ce795e9af14
Author: Tom Clegg <tom at curii.com>
Date:   Wed Oct 6 15:33:50 2021 -0400

    16347: Update test case to use context instead of CloseNotifier.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index cbb7f38bb..d545bde0a 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -1154,15 +1154,6 @@ func (s *HandlerSuite) TestPutHandlerNoBufferleak(c *check.C) {
 	}
 }
 
-type notifyingResponseRecorder struct {
-	*httptest.ResponseRecorder
-	closer chan bool
-}
-
-func (r *notifyingResponseRecorder) CloseNotify() <-chan bool {
-	return r.closer
-}
-
 func (s *HandlerSuite) TestGetHandlerClientDisconnect(c *check.C) {
 	s.cluster.Collections.BlobSigning = false
 	c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil)
@@ -1173,23 +1164,15 @@ func (s *HandlerSuite) TestGetHandlerClientDisconnect(c *check.C) {
 	bufs = newBufferPool(ctxlog.TestLogger(c), 1, BlockSize)
 	defer bufs.Put(bufs.Get(BlockSize))
 
-	if err := s.handler.volmgr.AllWritable()[0].Put(context.Background(), TestHash, TestBlock); err != nil {
-		c.Error(err)
-	}
-
-	resp := &notifyingResponseRecorder{
-		ResponseRecorder: httptest.NewRecorder(),
-		closer:           make(chan bool, 1),
-	}
-	if _, ok := http.ResponseWriter(resp).(http.CloseNotifier); !ok {
-		c.Fatal("notifyingResponseRecorder is broken")
-	}
-	// If anyone asks, the client has disconnected.
-	resp.closer <- true
+	err := s.handler.volmgr.AllWritable()[0].Put(context.Background(), TestHash, TestBlock)
+	c.Assert(err, check.IsNil)
 
+	resp := httptest.NewRecorder()
 	ok := make(chan struct{})
 	go func() {
-		req, _ := http.NewRequest("GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil)
+		ctx, cancel := context.WithCancel(context.Background())
+		req, _ := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil)
+		cancel()
 		s.handler.ServeHTTP(resp, req)
 		ok <- struct{}{}
 	}()
@@ -1200,7 +1183,7 @@ func (s *HandlerSuite) TestGetHandlerClientDisconnect(c *check.C) {
 	case <-ok:
 	}
 
-	ExpectStatusCode(c, "client disconnect", http.StatusServiceUnavailable, resp.ResponseRecorder)
+	ExpectStatusCode(c, "client disconnect", http.StatusServiceUnavailable, resp)
 	for i, v := range s.handler.volmgr.AllWritable() {
 		if calls := v.Volume.(*MockVolume).called["GET"]; calls != 0 {
 			c.Errorf("volume %d got %d calls, expected 0", i, calls)

commit f968e4a8eeb562964f265d7555eaf295f8bac7f6
Author: Tom Clegg <tom at curii.com>
Date:   Wed Oct 6 14:47:19 2021 -0400

    16347: Add new config key to whitelist map.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/lib/config/export.go b/lib/config/export.go
index 92e2d7b4d..f400dabbf 100644
--- a/lib/config/export.go
+++ b/lib/config/export.go
@@ -119,6 +119,7 @@ var whitelist = map[string]bool{
 	"Containers.JobsAPI":                                  true,
 	"Containers.JobsAPI.Enable":                           true,
 	"Containers.JobsAPI.GitInternalDir":                   false,
+	"Containers.LocalKeepBlobBuffersPerVCPU":              false,
 	"Containers.Logging":                                  false,
 	"Containers.LogReuseDecisions":                        false,
 	"Containers.LSF":                                      false,

commit 146c1e2c5d12f478e4dbf35c5a675f05a793f5b7
Author: Tom Clegg <tom at curii.com>
Date:   Wed Oct 6 14:44:19 2021 -0400

    16347: Skip journald logging if systemd-cat is not installed.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/lib/crunchrun/background.go b/lib/crunchrun/background.go
index 07f8f5b88..8a919bc5e 100644
--- a/lib/crunchrun/background.go
+++ b/lib/crunchrun/background.go
@@ -77,20 +77,24 @@ func detach(uuid string, prog string, args []string, stdin io.Reader, stdout io.
 		// invoked as "/path/to/crunch-run"
 		execargs = append([]string{prog}, execargs...)
 	}
-	execargs = append([]string{
-		// Here, if the inner systemd-cat can't exec
-		// crunch-run, it writes an error message to stderr,
-		// and the outer systemd-cat writes it to the journal
-		// where the operator has a chance to discover it. (If
-		// we only used one systemd-cat command, it would be
-		// up to us to report the error -- but we are going to
-		// detach and exit, not wait for something to appear
-		// on stderr.)  Note these systemd-cat calls don't
-		// result in additional processes -- they just connect
-		// stderr/stdout to sockets and call exec().
-		"systemd-cat", "--identifier=crunch-run",
-		"systemd-cat", "--identifier=crunch-run",
-	}, execargs...)
+	if _, err := exec.LookPath("systemd-cat"); err == nil {
+		execargs = append([]string{
+			// Here, if the inner systemd-cat can't exec
+			// crunch-run, it writes an error message to
+			// stderr, and the outer systemd-cat writes it
+			// to the journal where the operator has a
+			// chance to discover it. (If we only used one
+			// systemd-cat command, it would be up to us
+			// to report the error -- but we are going to
+			// detach and exit, not wait for something to
+			// appear on stderr.)  Note these systemd-cat
+			// calls don't result in additional processes
+			// -- they just connect stderr/stdout to
+			// sockets and call exec().
+			"systemd-cat", "--identifier=crunch-run",
+			"systemd-cat", "--identifier=crunch-run",
+		}, execargs...)
+	}
 
 	cmd := exec.Command(execargs[0], execargs[1:]...)
 	// Child inherits lockfile.

commit c6e6a2531ef791e7fdae10b440c778244dd62019
Author: Tom Clegg <tom at curii.com>
Date:   Mon Oct 4 23:25:13 2021 -0400

    16347: Remove closenotifier shim, just use request context.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index 29e7b2ca9..2a90705a5 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -112,12 +112,9 @@ func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
 }
 
 func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
-	ctx, cancel := contextForResponse(context.TODO(), resp)
-	defer cancel()
-
 	locator := req.URL.Path[1:]
 	if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") {
-		rtr.remoteProxy.Get(ctx, resp, req, rtr.cluster, rtr.volmgr)
+		rtr.remoteProxy.Get(req.Context(), resp, req, rtr.cluster, rtr.volmgr)
 		return
 	}
 
@@ -136,14 +133,14 @@ func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
 	// isn't here, we can return 404 now instead of waiting for a
 	// buffer.
 
-	buf, err := getBufferWithContext(ctx, bufs, BlockSize)
+	buf, err := getBufferWithContext(req.Context(), bufs, BlockSize)
 	if err != nil {
 		http.Error(resp, err.Error(), http.StatusServiceUnavailable)
 		return
 	}
 	defer bufs.Put(buf)
 
-	size, err := GetBlock(ctx, rtr.volmgr, mux.Vars(req)["hash"], buf, resp)
+	size, err := GetBlock(req.Context(), rtr.volmgr, mux.Vars(req)["hash"], buf, resp)
 	if err != nil {
 		code := http.StatusInternalServerError
 		if err, ok := err.(*KeepError); ok {
@@ -158,21 +155,6 @@ func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
 	resp.Write(buf[:size])
 }
 
-// Return a new context that gets cancelled by resp's CloseNotifier.
-func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
-	ctx, cancel := context.WithCancel(parent)
-	if cn, ok := resp.(http.CloseNotifier); ok {
-		go func(c <-chan bool) {
-			select {
-			case <-c:
-				cancel()
-			case <-ctx.Done():
-			}
-		}(cn.CloseNotify())
-	}
-	return ctx, cancel
-}
-
 // Get a buffer from the pool -- but give up and return a non-nil
 // error if ctx ends before we get a buffer.
 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
@@ -223,9 +205,6 @@ func (rtr *router) handleTOUCH(resp http.ResponseWriter, req *http.Request) {
 }
 
 func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
-	ctx, cancel := contextForResponse(context.TODO(), resp)
-	defer cancel()
-
 	hash := mux.Vars(req)["hash"]
 
 	// Detect as many error conditions as possible before reading
@@ -262,7 +241,7 @@ func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
 		}
 	}
 
-	buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
+	buf, err := getBufferWithContext(req.Context(), bufs, int(req.ContentLength))
 	if err != nil {
 		http.Error(resp, err.Error(), http.StatusServiceUnavailable)
 		return
@@ -275,7 +254,7 @@ func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
 		return
 	}
 
-	result, err := PutBlock(ctx, rtr.volmgr, buf, hash, wantStorageClasses)
+	result, err := PutBlock(req.Context(), rtr.volmgr, buf, hash, wantStorageClasses)
 	bufs.Put(buf)
 
 	if err != nil {

commit 9a615740476541185073f98d8bc6e69e06ecf340
Author: Tom Clegg <tom at curii.com>
Date:   Mon Oct 4 23:30:23 2021 -0400

    16347: Fix Azure storage driver log spam.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go
index cf655c2a5..f9b383e70 100644
--- a/services/keepstore/azure_blob_volume.go
+++ b/services/keepstore/azure_blob_volume.go
@@ -558,6 +558,9 @@ func (v *AzureBlobVolume) translateError(err error) error {
 	case strings.Contains(err.Error(), "Not Found"):
 		// "storage: service returned without a response body (404 Not Found)"
 		return os.ErrNotExist
+	case strings.Contains(err.Error(), "ErrorCode=BlobNotFound"):
+		// "storage: service returned error: StatusCode=404, ErrorCode=BlobNotFound, ErrorMessage=The specified blob does not exist.\n..."
+		return os.ErrNotExist
 	default:
 		return err
 	}

commit 6b3a880d607ee3e3dd273f019981fd6cae62373c
Author: Tom Clegg <tom at curii.com>
Date:   Mon Oct 4 10:55:49 2021 -0400

    16347: Run a dedicated keepstore process for each container.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml
index 4e2a0e26d..429b9fe48 100644
--- a/lib/config/config.default.yml
+++ b/lib/config/config.default.yml
@@ -911,6 +911,24 @@ Clusters:
       # Container runtime: "docker" (default) or "singularity"
       RuntimeEngine: docker
 
+      # When running a container, run a dedicated keepstore process,
+      # using the specified number of 64 MiB memory buffers per
+      # allocated CPU core (VCPUs in the container's runtime
+      # constraints). The dedicated keepstore handles I/O for
+      # collections mounted in the container, as well as saving
+      # container logs.
+      #
+      # A zero value disables this feature.
+      #
+      # This feature has security implications. (1) Container logs
+      # will include keepstore log files, which typically reveal some
+      # volume configuration details, error messages from the cloud
+      # storage provider, etc., which are not otherwise visible to
+      # users. (2) The entire cluster configuration file, including
+      # the system root token, is copied to the worker node and held
+      # in memory for the duration of the container.
+      LocalKeepBlobBuffersPerVCPU: 0
+
       Logging:
         # When you run the db:delete_old_container_logs task, it will find
         # containers that have been finished for at least this many seconds,
diff --git a/lib/config/generated_config.go b/lib/config/generated_config.go
index 875939a3e..dfa406bbe 100644
--- a/lib/config/generated_config.go
+++ b/lib/config/generated_config.go
@@ -917,6 +917,24 @@ Clusters:
       # Container runtime: "docker" (default) or "singularity"
       RuntimeEngine: docker
 
+      # When running a container, run a dedicated keepstore process,
+      # using the specified number of 64 MiB memory buffers per
+      # allocated CPU core (VCPUs in the container's runtime
+      # constraints). The dedicated keepstore handles I/O for
+      # collections mounted in the container, as well as saving
+      # container logs.
+      #
+      # A zero value disables this feature.
+      #
+      # This feature has security implications. (1) Container logs
+      # will include keepstore log files, which typically reveal some
+      # volume configuration details, error messages from the cloud
+      # storage provider, etc., which are not otherwise visible to
+      # users. (2) The entire cluster configuration file, including
+      # the system root token, is copied to the worker node and held
+      # in memory for the duration of the container.
+      LocalKeepBlobBuffersPerVCPU: 0
+
       Logging:
         # When you run the db:delete_old_container_logs task, it will find
         # containers that have been finished for at least this many seconds,
diff --git a/lib/config/load.go b/lib/config/load.go
index 248960beb..b6375c820 100644
--- a/lib/config/load.go
+++ b/lib/config/load.go
@@ -297,6 +297,7 @@ func (ldr *Loader) Load() (*arvados.Config, error) {
 			checkKeyConflict(fmt.Sprintf("Clusters.%s.PostgreSQL.Connection", id), cc.PostgreSQL.Connection),
 			ldr.checkEmptyKeepstores(cc),
 			ldr.checkUnlistedKeepstores(cc),
+			ldr.checkLocalKeepstoreVolumes(cc),
 			ldr.checkStorageClasses(cc),
 			// TODO: check non-empty Rendezvous on
 			// services other than Keepstore
@@ -361,6 +362,18 @@ cluster:
 	return nil
 }
 
+func (ldr *Loader) checkLocalKeepstoreVolumes(cc arvados.Cluster) error {
+	if cc.Containers.LocalKeepBlobBuffersPerVCPU < 1 {
+		return nil
+	}
+	for _, vol := range cc.Volumes {
+		if len(vol.AccessViaHosts) == 0 {
+			return nil
+		}
+	}
+	return fmt.Errorf("LocalKeepBlobBuffersPerVCPU is %d, but no volumes would be accessible from a worker instance", cc.Containers.LocalKeepBlobBuffersPerVCPU)
+}
+
 func (ldr *Loader) checkStorageClasses(cc arvados.Cluster) error {
 	classOnVolume := map[string]bool{}
 	for volid, vol := range cc.Volumes {
diff --git a/lib/crunchrun/background.go b/lib/crunchrun/background.go
index 4bb249380..07f8f5b88 100644
--- a/lib/crunchrun/background.go
+++ b/lib/crunchrun/background.go
@@ -36,10 +36,10 @@ type procinfo struct {
 //
 // Stdout and stderr in the child process are sent to the systemd
 // journal using the systemd-cat program.
-func Detach(uuid string, prog string, args []string, stdout, stderr io.Writer) int {
-	return exitcode(stderr, detach(uuid, prog, args, stdout, stderr))
+func Detach(uuid string, prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
+	return exitcode(stderr, detach(uuid, prog, args, stdin, stdout))
 }
-func detach(uuid string, prog string, args []string, stdout, stderr io.Writer) error {
+func detach(uuid string, prog string, args []string, stdin io.Reader, stdout io.Writer) error {
 	lockfile, err := func() (*os.File, error) {
 		// We must hold the dir-level lock between
 		// opening/creating the lockfile and acquiring LOCK_EX
@@ -99,10 +99,26 @@ func detach(uuid string, prog string, args []string, stdout, stderr io.Writer) e
 	// from parent (sshd) while sending lockfile content to
 	// caller.
 	cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
+	// We need to manage our own OS pipe here to ensure the child
+	// process reads all of our stdin pipe before we return.
+	piper, pipew, err := os.Pipe()
+	if err != nil {
+		return err
+	}
+	defer pipew.Close()
+	cmd.Stdin = piper
 	err = cmd.Start()
 	if err != nil {
 		return fmt.Errorf("exec %s: %s", cmd.Path, err)
 	}
+	_, err = io.Copy(pipew, stdin)
+	if err != nil {
+		return err
+	}
+	err = pipew.Close()
+	if err != nil {
+		return err
+	}
 
 	w := io.MultiWriter(stdout, lockfile)
 	return json.NewEncoder(w).Encode(procinfo{
diff --git a/lib/crunchrun/bufthenwrite.go b/lib/crunchrun/bufthenwrite.go
new file mode 100644
index 000000000..2d1c40716
--- /dev/null
+++ b/lib/crunchrun/bufthenwrite.go
@@ -0,0 +1,34 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package crunchrun
+
+import (
+	"bytes"
+	"io"
+	"sync"
+)
+
+type bufThenWrite struct {
+	buf bytes.Buffer
+	w   io.Writer
+	mtx sync.Mutex
+}
+
+func (btw *bufThenWrite) SetWriter(w io.Writer) error {
+	btw.mtx.Lock()
+	defer btw.mtx.Unlock()
+	btw.w = w
+	_, err := io.Copy(w, &btw.buf)
+	return err
+}
+
+func (btw *bufThenWrite) Write(p []byte) (int, error) {
+	btw.mtx.Lock()
+	defer btw.mtx.Unlock()
+	if btw.w == nil {
+		btw.w = &btw.buf
+	}
+	return btw.w.Write(p)
+}
diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go
index 42f143f1c..7a2afeacc 100644
--- a/lib/crunchrun/crunchrun.go
+++ b/lib/crunchrun/crunchrun.go
@@ -6,6 +6,7 @@ package crunchrun
 
 import (
 	"bytes"
+	"context"
 	"encoding/json"
 	"errors"
 	"flag"
@@ -13,6 +14,8 @@ import (
 	"io"
 	"io/ioutil"
 	"log"
+	"net"
+	"net/http"
 	"os"
 	"os/exec"
 	"os/signal"
@@ -33,13 +36,20 @@ import (
 	"git.arvados.org/arvados.git/sdk/go/arvadosclient"
 	"git.arvados.org/arvados.git/sdk/go/keepclient"
 	"git.arvados.org/arvados.git/sdk/go/manifest"
-	"golang.org/x/net/context"
 )
 
 type command struct{}
 
 var Command = command{}
 
+// ConfigData contains environment variables and (when needed) cluster
+// configuration, passed from dispatchcloud to crunch-run on stdin.
+type ConfigData struct {
+	Env         map[string]string
+	KeepBuffers int
+	Cluster     *arvados.Cluster
+}
+
 // IArvadosClient is the minimal Arvados API methods used by crunch-run.
 type IArvadosClient interface {
 	Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
@@ -1644,7 +1654,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
 	cgroupParentSubsystem := flags.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container")
 	caCertsPath := flags.String("ca-certs", "", "Path to TLS root certificates")
 	detach := flags.Bool("detach", false, "Detach from parent process and run in the background")
-	stdinEnv := flags.Bool("stdin-env", false, "Load environment variables from JSON message on stdin")
+	stdinConfig := flags.Bool("stdin-config", false, "Load config and environment variables from JSON message on stdin")
 	sleep := flags.Duration("sleep", 0, "Delay before starting (testing use only)")
 	kill := flags.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID")
 	list := flags.Bool("list", false, "List UUIDs of existing crunch-run processes")
@@ -1674,33 +1684,45 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
 		return 1
 	}
 
-	if *stdinEnv && !ignoreDetachFlag {
-		// Load env vars on stdin if asked (but not in a
-		// detached child process, in which case stdin is
-		// /dev/null).
-		err := loadEnv(os.Stdin)
-		if err != nil {
-			log.Print(err)
-			return 1
-		}
-	}
-
 	containerUUID := flags.Arg(0)
 
 	switch {
 	case *detach && !ignoreDetachFlag:
-		return Detach(containerUUID, prog, args, os.Stdout, os.Stderr)
+		return Detach(containerUUID, prog, args, os.Stdin, os.Stdout, os.Stderr)
 	case *kill >= 0:
 		return KillProcess(containerUUID, syscall.Signal(*kill), os.Stdout, os.Stderr)
 	case *list:
 		return ListProcesses(os.Stdout, os.Stderr)
 	}
 
-	if containerUUID == "" {
+	if len(containerUUID) != 27 {
 		log.Printf("usage: %s [options] UUID", prog)
 		return 1
 	}
 
+	var conf ConfigData
+	if *stdinConfig {
+		err := json.NewDecoder(os.Stdin).Decode(&conf)
+		if err != nil {
+			log.Print(err)
+			return 1
+		}
+		for k, v := range conf.Env {
+			err = os.Setenv(k, v)
+			if err != nil {
+				log.Printf("setenv(%q): %s", k, err)
+				return 1
+			}
+		}
+		if conf.Cluster != nil {
+			// ClusterID is missing from the JSON
+			// representation, but we need it to generate
+			// a valid config file for keepstore, so we
+			// fill it using the container UUID prefix.
+			conf.Cluster.ClusterID = containerUUID[:5]
+		}
+	}
+
 	log.Printf("crunch-run %s started", cmd.Version.String())
 	time.Sleep(*sleep)
 
@@ -1708,6 +1730,16 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
 		arvadosclient.CertFiles = []string{*caCertsPath}
 	}
 
+	var keepstoreLog bufThenWrite
+	keepstore, err := startLocalKeepstore(conf, io.MultiWriter(&keepstoreLog, stderr))
+	if err != nil {
+		log.Print(err)
+		return 1
+	}
+	if keepstore != nil {
+		defer keepstore.Process.Kill()
+	}
+
 	api, err := arvadosclient.MakeArvadosClient()
 	if err != nil {
 		log.Printf("%s: %v", containerUUID, err)
@@ -1729,6 +1761,19 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
 		return 1
 	}
 
+	if keepstore != nil {
+		w, err := cr.NewLogWriter("keepstore")
+		if err != nil {
+			log.Print(err)
+			return 1
+		}
+		err = keepstoreLog.SetWriter(NewThrottledLogger(w))
+		if err != nil {
+			log.Print(err)
+			return 1
+		}
+	}
+
 	switch *runtimeEngine {
 	case "docker":
 		cr.executor, err = newDockerExecutor(containerUUID, cr.CrunchLog.Printf, cr.containerWatchdogInterval)
@@ -1816,21 +1861,73 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
 	return 0
 }
 
-func loadEnv(rdr io.Reader) error {
-	buf, err := ioutil.ReadAll(rdr)
+func startLocalKeepstore(configData ConfigData, logbuf io.Writer) (*exec.Cmd, error) {
+	if configData.Cluster == nil || configData.KeepBuffers < 1 {
+		return nil, nil
+	}
+
+	// Rather than have an alternate way to tell keepstore how
+	// many buffers to use when starting it this way, we just
+	// modify the cluster configuration that we feed it on stdin.
+	configData.Cluster.API.MaxKeepBlobBuffers = configData.KeepBuffers
+
+	ln, err := net.Listen("tcp", "localhost:0")
+	if err != nil {
+		return nil, err
+	}
+	_, port, err := net.SplitHostPort(ln.Addr().String())
+	if err != nil {
+		ln.Close()
+		return nil, err
+	}
+	ln.Close()
+	url := "http://localhost:" + port
+
+	fmt.Fprintf(logbuf, "starting keepstore on %s\n", url)
+
+	var confJSON bytes.Buffer
+	err = json.NewEncoder(&confJSON).Encode(arvados.Config{
+		Clusters: map[string]arvados.Cluster{
+			configData.Cluster.ClusterID: *configData.Cluster,
+		},
+	})
 	if err != nil {
-		return fmt.Errorf("read stdin: %s", err)
+		return nil, err
+	}
+	cmd := exec.Command("/proc/self/exe", "keepstore", "-config=-")
+	cmd.Stdin = &confJSON
+	cmd.Stdout = logbuf
+	cmd.Stderr = logbuf
+	cmd.Env = []string{
+		"GOGC=10",
+		"ARVADOS_SERVICE_INTERNAL_URL=" + url,
 	}
-	var env map[string]string
-	err = json.Unmarshal(buf, &env)
+	err = cmd.Start()
 	if err != nil {
-		return fmt.Errorf("decode stdin: %s", err)
+		return nil, fmt.Errorf("error starting keepstore process: %w", err)
 	}
-	for k, v := range env {
-		err = os.Setenv(k, v)
+	ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*10))
+	defer cancel()
+	poll := time.NewTicker(time.Second / 10)
+	defer poll.Stop()
+	client := http.Client{}
+	for range poll.C {
+		testReq, err := http.NewRequestWithContext(ctx, "GET", url, nil)
 		if err != nil {
-			return fmt.Errorf("setenv(%q): %s", k, err)
+			return nil, err
+		}
+		resp, err := client.Do(testReq)
+		if err == nil {
+			// Success -- don't need to check the
+			// response, we just need to know it's
+			// accepting requests.
+			resp.Body.Close()
+			break
+		}
+		if ctx.Err() != nil {
+			return nil, fmt.Errorf("timed out waiting for new keepstore process to accept a request")
 		}
 	}
-	return nil
+	os.Setenv("ARVADOS_KEEP_SERVICES", url)
+	return cmd, nil
 }
diff --git a/lib/dispatchcloud/test/stub_driver.go b/lib/dispatchcloud/test/stub_driver.go
index 1b31a71a2..f57db0f09 100644
--- a/lib/dispatchcloud/test/stub_driver.go
+++ b/lib/dispatchcloud/test/stub_driver.go
@@ -18,6 +18,7 @@ import (
 	"time"
 
 	"git.arvados.org/arvados.git/lib/cloud"
+	"git.arvados.org/arvados.git/lib/crunchrun"
 	"git.arvados.org/arvados.git/sdk/go/arvados"
 	"github.com/sirupsen/logrus"
 	"golang.org/x/crypto/ssh"
@@ -193,7 +194,7 @@ type StubVM struct {
 	ArvMountDeadlockRate  float64
 	ExecuteContainer      func(arvados.Container) int
 	CrashRunningContainer func(arvados.Container)
-	ExtraCrunchRunArgs    string // extra args expected after "crunch-run --detach --stdin-env "
+	ExtraCrunchRunArgs    string // extra args expected after "crunch-run --detach --stdin-config "
 
 	sis          *StubInstanceSet
 	id           cloud.InstanceID
@@ -252,15 +253,15 @@ func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader,
 		fmt.Fprint(stderr, "crunch-run: command not found\n")
 		return 1
 	}
-	if strings.HasPrefix(command, "crunch-run --detach --stdin-env "+svm.ExtraCrunchRunArgs) {
-		var stdinKV map[string]string
-		err := json.Unmarshal(stdinData, &stdinKV)
+	if strings.HasPrefix(command, "crunch-run --detach --stdin-config "+svm.ExtraCrunchRunArgs) {
+		var configData crunchrun.ConfigData
+		err := json.Unmarshal(stdinData, &configData)
 		if err != nil {
 			fmt.Fprintf(stderr, "unmarshal stdin: %s (stdin was: %q)\n", err, stdinData)
 			return 1
 		}
 		for _, name := range []string{"ARVADOS_API_HOST", "ARVADOS_API_TOKEN"} {
-			if stdinKV[name] == "" {
+			if configData.Env[name] == "" {
 				fmt.Fprintf(stderr, "%s env var missing from stdin %q\n", name, stdinData)
 				return 1
 			}
diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go
index a5924cf99..37e3fa988 100644
--- a/lib/dispatchcloud/worker/pool.go
+++ b/lib/dispatchcloud/worker/pool.go
@@ -103,6 +103,7 @@ func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *promethe
 		instanceSetID:                  instanceSetID,
 		instanceSet:                    &throttledInstanceSet{InstanceSet: instanceSet},
 		newExecutor:                    newExecutor,
+		cluster:                        cluster,
 		bootProbeCommand:               cluster.Containers.CloudVMs.BootProbeCommand,
 		runnerSource:                   cluster.Containers.CloudVMs.DeployRunnerBinary,
 		imageID:                        cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
@@ -144,6 +145,7 @@ type Pool struct {
 	instanceSetID                  cloud.InstanceSetID
 	instanceSet                    *throttledInstanceSet
 	newExecutor                    func(cloud.Instance) Executor
+	cluster                        *arvados.Cluster
 	bootProbeCommand               string
 	runnerSource                   string
 	imageID                        cloud.ImageID
diff --git a/lib/dispatchcloud/worker/pool_test.go b/lib/dispatchcloud/worker/pool_test.go
index 0f5c5ee19..7b5634605 100644
--- a/lib/dispatchcloud/worker/pool_test.go
+++ b/lib/dispatchcloud/worker/pool_test.go
@@ -10,10 +10,12 @@ import (
 	"time"
 
 	"git.arvados.org/arvados.git/lib/cloud"
+	"git.arvados.org/arvados.git/lib/config"
 	"git.arvados.org/arvados.git/lib/dispatchcloud/test"
 	"git.arvados.org/arvados.git/sdk/go/arvados"
 	"git.arvados.org/arvados.git/sdk/go/ctxlog"
 	"github.com/prometheus/client_golang/prometheus"
+	"github.com/sirupsen/logrus"
 	check "gopkg.in/check.v1"
 )
 
@@ -31,7 +33,18 @@ func (*lessChecker) Check(params []interface{}, names []string) (result bool, er
 
 var less = &lessChecker{&check.CheckerInfo{Name: "less", Params: []string{"obtained", "expected"}}}
 
-type PoolSuite struct{}
+type PoolSuite struct {
+	logger      logrus.FieldLogger
+	testCluster *arvados.Cluster
+}
+
+func (suite *PoolSuite) SetUpTest(c *check.C) {
+	suite.logger = ctxlog.TestLogger(c)
+	cfg, err := config.NewLoader(nil, suite.logger).Load()
+	c.Assert(err, check.IsNil)
+	suite.testCluster, err = cfg.GetCluster("")
+	c.Assert(err, check.IsNil)
+}
 
 func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
 	type1 := test.InstanceType(1)
@@ -63,10 +76,9 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
 		}
 	}
 
-	logger := ctxlog.TestLogger(c)
 	driver := &test.StubDriver{}
 	instanceSetID := cloud.InstanceSetID("test-instance-set-id")
-	is, err := driver.InstanceSet(nil, instanceSetID, nil, logger)
+	is, err := driver.InstanceSet(nil, instanceSetID, nil, suite.logger)
 	c.Assert(err, check.IsNil)
 
 	newExecutor := func(cloud.Instance) Executor {
@@ -78,25 +90,21 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
 		}
 	}
 
-	cluster := &arvados.Cluster{
-		Containers: arvados.ContainersConfig{
-			CloudVMs: arvados.CloudVMsConfig{
-				BootProbeCommand:   "true",
-				MaxProbesPerSecond: 1000,
-				ProbeInterval:      arvados.Duration(time.Millisecond * 10),
-				SyncInterval:       arvados.Duration(time.Millisecond * 10),
-				TagKeyPrefix:       "testprefix:",
-			},
-			CrunchRunCommand: "crunch-run-custom",
-		},
-		InstanceTypes: arvados.InstanceTypeMap{
-			type1.Name: type1,
-			type2.Name: type2,
-			type3.Name: type3,
-		},
+	suite.testCluster.Containers.CloudVMs = arvados.CloudVMsConfig{
+		BootProbeCommand:   "true",
+		MaxProbesPerSecond: 1000,
+		ProbeInterval:      arvados.Duration(time.Millisecond * 10),
+		SyncInterval:       arvados.Duration(time.Millisecond * 10),
+		TagKeyPrefix:       "testprefix:",
+	}
+	suite.testCluster.Containers.CrunchRunCommand = "crunch-run-custom"
+	suite.testCluster.InstanceTypes = arvados.InstanceTypeMap{
+		type1.Name: type1,
+		type2.Name: type2,
+		type3.Name: type3,
 	}
 
-	pool := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, cluster)
+	pool := NewPool(suite.logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, suite.testCluster)
 	notify := pool.Subscribe()
 	defer pool.Unsubscribe(notify)
 	pool.Create(type1)
@@ -111,7 +119,7 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
 		}
 	}
 	// Wait for the tags to save to the cloud provider
-	tagKey := cluster.Containers.CloudVMs.TagKeyPrefix + tagKeyIdleBehavior
+	tagKey := suite.testCluster.Containers.CloudVMs.TagKeyPrefix + tagKeyIdleBehavior
 	deadline := time.Now().Add(time.Second)
 	for !func() bool {
 		pool.mtx.RLock()
@@ -132,7 +140,7 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
 
 	c.Log("------- starting new pool, waiting to recover state")
 
-	pool2 := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, cluster)
+	pool2 := NewPool(suite.logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, suite.testCluster)
 	notify2 := pool2.Subscribe()
 	defer pool2.Unsubscribe(notify2)
 	waitForIdle(pool2, notify2)
@@ -148,9 +156,8 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
 }
 
 func (suite *PoolSuite) TestDrain(c *check.C) {
-	logger := ctxlog.TestLogger(c)
 	driver := test.StubDriver{}
-	instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, logger)
+	instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, suite.logger)
 	c.Assert(err, check.IsNil)
 
 	ac := arvados.NewClientFromEnv()
@@ -158,8 +165,9 @@ func (suite *PoolSuite) TestDrain(c *check.C) {
 	type1 := test.InstanceType(1)
 	pool := &Pool{
 		arvClient:   ac,
-		logger:      logger,
+		logger:      suite.logger,
 		newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
+		cluster:     suite.testCluster,
 		instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
 		instanceTypes: arvados.InstanceTypeMap{
 			type1.Name: type1,
@@ -201,15 +209,15 @@ func (suite *PoolSuite) TestDrain(c *check.C) {
 }
 
 func (suite *PoolSuite) TestNodeCreateThrottle(c *check.C) {
-	logger := ctxlog.TestLogger(c)
 	driver := test.StubDriver{HoldCloudOps: true}
-	instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, logger)
+	instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, suite.logger)
 	c.Assert(err, check.IsNil)
 
 	type1 := test.InstanceType(1)
 	pool := &Pool{
-		logger:                         logger,
+		logger:                         suite.logger,
 		instanceSet:                    &throttledInstanceSet{InstanceSet: instanceSet},
+		cluster:                        suite.testCluster,
 		maxConcurrentInstanceCreateOps: 1,
 		instanceTypes: arvados.InstanceTypeMap{
 			type1.Name: type1,
@@ -241,17 +249,17 @@ func (suite *PoolSuite) TestNodeCreateThrottle(c *check.C) {
 }
 
 func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
-	logger := ctxlog.TestLogger(c)
 	driver := test.StubDriver{HoldCloudOps: true}
-	instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, logger)
+	instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, suite.logger)
 	c.Assert(err, check.IsNil)
 
 	type1 := arvados.InstanceType{Name: "a1s", ProviderType: "a1.small", VCPUs: 1, RAM: 1 * GiB, Price: .01}
 	type2 := arvados.InstanceType{Name: "a2m", ProviderType: "a2.medium", VCPUs: 2, RAM: 2 * GiB, Price: .02}
 	type3 := arvados.InstanceType{Name: "a2l", ProviderType: "a2.large", VCPUs: 4, RAM: 4 * GiB, Price: .04}
 	pool := &Pool{
-		logger:      logger,
+		logger:      suite.logger,
 		newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
+		cluster:     suite.testCluster,
 		instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
 		instanceTypes: arvados.InstanceTypeMap{
 			type1.Name: type1,
diff --git a/lib/dispatchcloud/worker/runner.go b/lib/dispatchcloud/worker/runner.go
index 63561874c..29c4b8e0a 100644
--- a/lib/dispatchcloud/worker/runner.go
+++ b/lib/dispatchcloud/worker/runner.go
@@ -13,6 +13,7 @@ import (
 	"syscall"
 	"time"
 
+	"git.arvados.org/arvados.git/lib/crunchrun"
 	"github.com/sirupsen/logrus"
 )
 
@@ -21,7 +22,7 @@ import (
 type remoteRunner struct {
 	uuid          string
 	executor      Executor
-	envJSON       json.RawMessage
+	configJSON    json.RawMessage
 	runnerCmd     string
 	runnerArgs    []string
 	remoteUser    string
@@ -47,7 +48,8 @@ func newRemoteRunner(uuid string, wkr *worker) *remoteRunner {
 	if err := enc.Encode(wkr.instType); err != nil {
 		panic(err)
 	}
-	env := map[string]string{
+	var configData crunchrun.ConfigData
+	configData.Env = map[string]string{
 		"ARVADOS_API_HOST":  wkr.wp.arvClient.APIHost,
 		"ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
 		"InstanceType":      instJSON.String(),
@@ -55,16 +57,20 @@ func newRemoteRunner(uuid string, wkr *worker) *remoteRunner {
 		"GatewayAuthSecret": wkr.wp.gatewayAuthSecret(uuid),
 	}
 	if wkr.wp.arvClient.Insecure {
-		env["ARVADOS_API_HOST_INSECURE"] = "1"
+		configData.Env["ARVADOS_API_HOST_INSECURE"] = "1"
 	}
-	envJSON, err := json.Marshal(env)
+	if bufs := wkr.wp.cluster.Containers.LocalKeepBlobBuffersPerVCPU; bufs > 0 {
+		configData.Cluster = wkr.wp.cluster
+		configData.KeepBuffers = bufs * wkr.instType.VCPUs
+	}
+	configJSON, err := json.Marshal(configData)
 	if err != nil {
 		panic(err)
 	}
 	rr := &remoteRunner{
 		uuid:          uuid,
 		executor:      wkr.executor,
-		envJSON:       envJSON,
+		configJSON:    configJSON,
 		runnerCmd:     wkr.wp.runnerCmd,
 		runnerArgs:    wkr.wp.runnerArgs,
 		remoteUser:    wkr.instance.RemoteUser(),
@@ -84,7 +90,7 @@ func newRemoteRunner(uuid string, wkr *worker) *remoteRunner {
 // assume the remote process _might_ have started, at least until it
 // probes the worker and finds otherwise.
 func (rr *remoteRunner) Start() {
-	cmd := rr.runnerCmd + " --detach --stdin-env"
+	cmd := rr.runnerCmd + " --detach --stdin-config"
 	for _, arg := range rr.runnerArgs {
 		cmd += " '" + strings.Replace(arg, "'", "'\\''", -1) + "'"
 	}
@@ -92,7 +98,7 @@ func (rr *remoteRunner) Start() {
 	if rr.remoteUser != "root" {
 		cmd = "sudo " + cmd
 	}
-	stdin := bytes.NewBuffer(rr.envJSON)
+	stdin := bytes.NewBuffer(rr.configJSON)
 	stdout, stderr, err := rr.executor.Execute(nil, cmd, stdin)
 	if err != nil {
 		rr.logger.WithField("stdout", string(stdout)).
diff --git a/lib/dispatchcloud/worker/worker_test.go b/lib/dispatchcloud/worker/worker_test.go
index 4134788b2..2ee6b7c36 100644
--- a/lib/dispatchcloud/worker/worker_test.go
+++ b/lib/dispatchcloud/worker/worker_test.go
@@ -14,24 +14,36 @@ import (
 	"time"
 
 	"git.arvados.org/arvados.git/lib/cloud"
+	"git.arvados.org/arvados.git/lib/config"
 	"git.arvados.org/arvados.git/lib/dispatchcloud/test"
 	"git.arvados.org/arvados.git/sdk/go/arvados"
 	"git.arvados.org/arvados.git/sdk/go/ctxlog"
 	"github.com/prometheus/client_golang/prometheus"
+	"github.com/sirupsen/logrus"
 	check "gopkg.in/check.v1"
 )
 
 var _ = check.Suite(&WorkerSuite{})
 
-type WorkerSuite struct{}
+type WorkerSuite struct {
+	logger      logrus.FieldLogger
+	testCluster *arvados.Cluster
+}
+
+func (suite *WorkerSuite) SetUpTest(c *check.C) {
+	suite.logger = ctxlog.TestLogger(c)
+	cfg, err := config.NewLoader(nil, suite.logger).Load()
+	c.Assert(err, check.IsNil)
+	suite.testCluster, err = cfg.GetCluster("")
+	c.Assert(err, check.IsNil)
+}
 
 func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) {
-	logger := ctxlog.TestLogger(c)
 	bootTimeout := time.Minute
 	probeTimeout := time.Second
 
 	ac := arvados.NewClientFromEnv()
-	is, err := (&test.StubDriver{}).InstanceSet(nil, "test-instance-set-id", nil, logger)
+	is, err := (&test.StubDriver{}).InstanceSet(nil, "test-instance-set-id", nil, suite.logger)
 	c.Assert(err, check.IsNil)
 	inst, err := is.Create(arvados.InstanceType{}, "", nil, "echo InitCommand", nil)
 	c.Assert(err, check.IsNil)
@@ -232,6 +244,7 @@ func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) {
 		wp := &Pool{
 			arvClient:        ac,
 			newExecutor:      func(cloud.Instance) Executor { return exr },
+			cluster:          suite.testCluster,
 			bootProbeCommand: "bootprobe",
 			timeoutBooting:   bootTimeout,
 			timeoutProbe:     probeTimeout,
@@ -249,7 +262,7 @@ func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) {
 			exr.response[wp.runnerCmd+" --list"] = trial.respRunDeployed
 		}
 		wkr := &worker{
-			logger:   logger,
+			logger:   suite.logger,
 			executor: exr,
 			wp:       wp,
 			mtx:      &wp.mtx,
diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go
index f1d27b8dc..b84e1eefa 100644
--- a/sdk/go/arvados/config.go
+++ b/sdk/go/arvados/config.go
@@ -435,6 +435,7 @@ type ContainersConfig struct {
 	SupportedDockerImageFormats StringSet
 	UsePreemptibleInstances     bool
 	RuntimeEngine               string
+	LocalKeepBlobBuffersPerVCPU int
 
 	JobsAPI struct {
 		Enable         string

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list