[ARVADOS] created: 2.1.0-1421-g0fc72d87c
Git user
git at public.arvados.org
Tue Oct 5 03:33:12 UTC 2021
at 0fc72d87c64606685f9882c4f1dc76ef071290a3 (commit)
commit 0fc72d87c64606685f9882c4f1dc76ef071290a3
Author: Tom Clegg <tom at curii.com>
Date: Mon Oct 4 23:25:13 2021 -0400
16347: Remove closenotifier shim, just use request context.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index 29e7b2ca9..2a90705a5 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -112,12 +112,9 @@ func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
}
func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
- ctx, cancel := contextForResponse(context.TODO(), resp)
- defer cancel()
-
locator := req.URL.Path[1:]
if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") {
- rtr.remoteProxy.Get(ctx, resp, req, rtr.cluster, rtr.volmgr)
+ rtr.remoteProxy.Get(req.Context(), resp, req, rtr.cluster, rtr.volmgr)
return
}
@@ -136,14 +133,14 @@ func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
// isn't here, we can return 404 now instead of waiting for a
// buffer.
- buf, err := getBufferWithContext(ctx, bufs, BlockSize)
+ buf, err := getBufferWithContext(req.Context(), bufs, BlockSize)
if err != nil {
http.Error(resp, err.Error(), http.StatusServiceUnavailable)
return
}
defer bufs.Put(buf)
- size, err := GetBlock(ctx, rtr.volmgr, mux.Vars(req)["hash"], buf, resp)
+ size, err := GetBlock(req.Context(), rtr.volmgr, mux.Vars(req)["hash"], buf, resp)
if err != nil {
code := http.StatusInternalServerError
if err, ok := err.(*KeepError); ok {
@@ -158,21 +155,6 @@ func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
resp.Write(buf[:size])
}
-// Return a new context that gets cancelled by resp's CloseNotifier.
-func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
- ctx, cancel := context.WithCancel(parent)
- if cn, ok := resp.(http.CloseNotifier); ok {
- go func(c <-chan bool) {
- select {
- case <-c:
- cancel()
- case <-ctx.Done():
- }
- }(cn.CloseNotify())
- }
- return ctx, cancel
-}
-
// Get a buffer from the pool -- but give up and return a non-nil
// error if ctx ends before we get a buffer.
func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
@@ -223,9 +205,6 @@ func (rtr *router) handleTOUCH(resp http.ResponseWriter, req *http.Request) {
}
func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
- ctx, cancel := contextForResponse(context.TODO(), resp)
- defer cancel()
-
hash := mux.Vars(req)["hash"]
// Detect as many error conditions as possible before reading
@@ -262,7 +241,7 @@ func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
}
}
- buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
+ buf, err := getBufferWithContext(req.Context(), bufs, int(req.ContentLength))
if err != nil {
http.Error(resp, err.Error(), http.StatusServiceUnavailable)
return
@@ -275,7 +254,7 @@ func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
return
}
- result, err := PutBlock(ctx, rtr.volmgr, buf, hash, wantStorageClasses)
+ result, err := PutBlock(req.Context(), rtr.volmgr, buf, hash, wantStorageClasses)
bufs.Put(buf)
if err != nil {
commit 1669db0664af5a96ce60771c4c43ad9ff9d6112c
Author: Tom Clegg <tom at curii.com>
Date: Mon Oct 4 23:30:23 2021 -0400
16347: Fix Azure storage driver log spam.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go
index cf655c2a5..f9b383e70 100644
--- a/services/keepstore/azure_blob_volume.go
+++ b/services/keepstore/azure_blob_volume.go
@@ -558,6 +558,9 @@ func (v *AzureBlobVolume) translateError(err error) error {
case strings.Contains(err.Error(), "Not Found"):
// "storage: service returned without a response body (404 Not Found)"
return os.ErrNotExist
+ case strings.Contains(err.Error(), "ErrorCode=BlobNotFound"):
+ // "storage: service returned error: StatusCode=404, ErrorCode=BlobNotFound, ErrorMessage=The specified blob does not exist.\n..."
+ return os.ErrNotExist
default:
return err
}
commit 38c5dedd7a16becb35ca2598731bd575dc49fe2a
Author: Tom Clegg <tom at curii.com>
Date: Mon Oct 4 10:55:49 2021 -0400
16347: Run a dedicated keepstore process for each container.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml
index 5bd575b12..a776459ca 100644
--- a/lib/config/config.default.yml
+++ b/lib/config/config.default.yml
@@ -911,6 +911,24 @@ Clusters:
# Container runtime: "docker" (default) or "singularity" (experimental)
RuntimeEngine: docker
+ # When running a container, run a dedicated keepstore process,
+ # using the specified number of 64 MiB memory buffers per
+ # allocated CPU core (VCPUs in the container's runtime
+ # constraints). The dedicated keepstore handles I/O for
+ # collections mounted in the container, as well as saving
+ # container logs.
+ #
+ # A zero value disables this feature.
+ #
+ # This feature has security implications. (1) Container logs
+ # will include keepstore log files, which typically reveal some
+ # volume configuration details, error messages from the cloud
+ # storage provider, etc., which are not otherwise visible to
+ # users. (2) The entire cluster configuration file, including
+ # the system root token, is copied to the worker node and held
+ # in memory for the duration of the container.
+ LocalKeepBlobBuffersPerVCPU: 0
+
Logging:
# When you run the db:delete_old_container_logs task, it will find
# containers that have been finished for at least this many seconds,
diff --git a/lib/config/generated_config.go b/lib/config/generated_config.go
index 4e24a5c7f..4725777b3 100644
--- a/lib/config/generated_config.go
+++ b/lib/config/generated_config.go
@@ -917,6 +917,24 @@ Clusters:
# Container runtime: "docker" (default) or "singularity" (experimental)
RuntimeEngine: docker
+ # When running a container, run a dedicated keepstore process,
+ # using the specified number of 64 MiB memory buffers per
+ # allocated CPU core (VCPUs in the container's runtime
+ # constraints). The dedicated keepstore handles I/O for
+ # collections mounted in the container, as well as saving
+ # container logs.
+ #
+ # A zero value disables this feature.
+ #
+ # This feature has security implications. (1) Container logs
+ # will include keepstore log files, which typically reveal some
+ # volume configuration details, error messages from the cloud
+ # storage provider, etc., which are not otherwise visible to
+ # users. (2) The entire cluster configuration file, including
+ # the system root token, is copied to the worker node and held
+ # in memory for the duration of the container.
+ LocalKeepBlobBuffersPerVCPU: 0
+
Logging:
# When you run the db:delete_old_container_logs task, it will find
# containers that have been finished for at least this many seconds,
diff --git a/lib/config/load.go b/lib/config/load.go
index 248960beb..b6375c820 100644
--- a/lib/config/load.go
+++ b/lib/config/load.go
@@ -297,6 +297,7 @@ func (ldr *Loader) Load() (*arvados.Config, error) {
checkKeyConflict(fmt.Sprintf("Clusters.%s.PostgreSQL.Connection", id), cc.PostgreSQL.Connection),
ldr.checkEmptyKeepstores(cc),
ldr.checkUnlistedKeepstores(cc),
+ ldr.checkLocalKeepstoreVolumes(cc),
ldr.checkStorageClasses(cc),
// TODO: check non-empty Rendezvous on
// services other than Keepstore
@@ -361,6 +362,18 @@ cluster:
return nil
}
+func (ldr *Loader) checkLocalKeepstoreVolumes(cc arvados.Cluster) error {
+ if cc.Containers.LocalKeepBlobBuffersPerVCPU < 1 {
+ return nil
+ }
+ for _, vol := range cc.Volumes {
+ if len(vol.AccessViaHosts) == 0 {
+ return nil
+ }
+ }
+ return fmt.Errorf("LocalKeepBlobBuffersPerVCPU is %d, but no volumes would be accessible from a worker instance", cc.Containers.LocalKeepBlobBuffersPerVCPU)
+}
+
func (ldr *Loader) checkStorageClasses(cc arvados.Cluster) error {
classOnVolume := map[string]bool{}
for volid, vol := range cc.Volumes {
diff --git a/lib/crunchrun/background.go b/lib/crunchrun/background.go
index 4bb249380..07f8f5b88 100644
--- a/lib/crunchrun/background.go
+++ b/lib/crunchrun/background.go
@@ -36,10 +36,10 @@ type procinfo struct {
//
// Stdout and stderr in the child process are sent to the systemd
// journal using the systemd-cat program.
-func Detach(uuid string, prog string, args []string, stdout, stderr io.Writer) int {
- return exitcode(stderr, detach(uuid, prog, args, stdout, stderr))
+func Detach(uuid string, prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
+ return exitcode(stderr, detach(uuid, prog, args, stdin, stdout))
}
-func detach(uuid string, prog string, args []string, stdout, stderr io.Writer) error {
+func detach(uuid string, prog string, args []string, stdin io.Reader, stdout io.Writer) error {
lockfile, err := func() (*os.File, error) {
// We must hold the dir-level lock between
// opening/creating the lockfile and acquiring LOCK_EX
@@ -99,10 +99,26 @@ func detach(uuid string, prog string, args []string, stdout, stderr io.Writer) e
// from parent (sshd) while sending lockfile content to
// caller.
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
+ // We need to manage our own OS pipe here to ensure the child
+ // process reads all of our stdin pipe before we return.
+ piper, pipew, err := os.Pipe()
+ if err != nil {
+ return err
+ }
+ defer pipew.Close()
+ cmd.Stdin = piper
err = cmd.Start()
if err != nil {
return fmt.Errorf("exec %s: %s", cmd.Path, err)
}
+ _, err = io.Copy(pipew, stdin)
+ if err != nil {
+ return err
+ }
+ err = pipew.Close()
+ if err != nil {
+ return err
+ }
w := io.MultiWriter(stdout, lockfile)
return json.NewEncoder(w).Encode(procinfo{
diff --git a/lib/crunchrun/bufthenwrite.go b/lib/crunchrun/bufthenwrite.go
new file mode 100644
index 000000000..2d1c40716
--- /dev/null
+++ b/lib/crunchrun/bufthenwrite.go
@@ -0,0 +1,34 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package crunchrun
+
+import (
+ "bytes"
+ "io"
+ "sync"
+)
+
+type bufThenWrite struct {
+ buf bytes.Buffer
+ w io.Writer
+ mtx sync.Mutex
+}
+
+func (btw *bufThenWrite) SetWriter(w io.Writer) error {
+ btw.mtx.Lock()
+ defer btw.mtx.Unlock()
+ btw.w = w
+ _, err := io.Copy(w, &btw.buf)
+ return err
+}
+
+func (btw *bufThenWrite) Write(p []byte) (int, error) {
+ btw.mtx.Lock()
+ defer btw.mtx.Unlock()
+ if btw.w == nil {
+ btw.w = &btw.buf
+ }
+ return btw.w.Write(p)
+}
diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go
index 42f143f1c..7a2afeacc 100644
--- a/lib/crunchrun/crunchrun.go
+++ b/lib/crunchrun/crunchrun.go
@@ -6,6 +6,7 @@ package crunchrun
import (
"bytes"
+ "context"
"encoding/json"
"errors"
"flag"
@@ -13,6 +14,8 @@ import (
"io"
"io/ioutil"
"log"
+ "net"
+ "net/http"
"os"
"os/exec"
"os/signal"
@@ -33,13 +36,20 @@ import (
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
"git.arvados.org/arvados.git/sdk/go/keepclient"
"git.arvados.org/arvados.git/sdk/go/manifest"
- "golang.org/x/net/context"
)
type command struct{}
var Command = command{}
+// ConfigData contains environment variables and (when needed) cluster
+// configuration, passed from dispatchcloud to crunch-run on stdin.
+type ConfigData struct {
+ Env map[string]string
+ KeepBuffers int
+ Cluster *arvados.Cluster
+}
+
// IArvadosClient is the minimal Arvados API methods used by crunch-run.
type IArvadosClient interface {
Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
@@ -1644,7 +1654,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
cgroupParentSubsystem := flags.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container")
caCertsPath := flags.String("ca-certs", "", "Path to TLS root certificates")
detach := flags.Bool("detach", false, "Detach from parent process and run in the background")
- stdinEnv := flags.Bool("stdin-env", false, "Load environment variables from JSON message on stdin")
+ stdinConfig := flags.Bool("stdin-config", false, "Load config and environment variables from JSON message on stdin")
sleep := flags.Duration("sleep", 0, "Delay before starting (testing use only)")
kill := flags.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID")
list := flags.Bool("list", false, "List UUIDs of existing crunch-run processes")
@@ -1674,33 +1684,45 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
return 1
}
- if *stdinEnv && !ignoreDetachFlag {
- // Load env vars on stdin if asked (but not in a
- // detached child process, in which case stdin is
- // /dev/null).
- err := loadEnv(os.Stdin)
- if err != nil {
- log.Print(err)
- return 1
- }
- }
-
containerUUID := flags.Arg(0)
switch {
case *detach && !ignoreDetachFlag:
- return Detach(containerUUID, prog, args, os.Stdout, os.Stderr)
+ return Detach(containerUUID, prog, args, os.Stdin, os.Stdout, os.Stderr)
case *kill >= 0:
return KillProcess(containerUUID, syscall.Signal(*kill), os.Stdout, os.Stderr)
case *list:
return ListProcesses(os.Stdout, os.Stderr)
}
- if containerUUID == "" {
+ if len(containerUUID) != 27 {
log.Printf("usage: %s [options] UUID", prog)
return 1
}
+ var conf ConfigData
+ if *stdinConfig {
+ err := json.NewDecoder(os.Stdin).Decode(&conf)
+ if err != nil {
+ log.Print(err)
+ return 1
+ }
+ for k, v := range conf.Env {
+ err = os.Setenv(k, v)
+ if err != nil {
+ log.Printf("setenv(%q): %s", k, err)
+ return 1
+ }
+ }
+ if conf.Cluster != nil {
+ // ClusterID is missing from the JSON
+ // representation, but we need it to generate
+ // a valid config file for keepstore, so we
+ // fill it using the container UUID prefix.
+ conf.Cluster.ClusterID = containerUUID[:5]
+ }
+ }
+
log.Printf("crunch-run %s started", cmd.Version.String())
time.Sleep(*sleep)
@@ -1708,6 +1730,16 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
arvadosclient.CertFiles = []string{*caCertsPath}
}
+ var keepstoreLog bufThenWrite
+ keepstore, err := startLocalKeepstore(conf, io.MultiWriter(&keepstoreLog, stderr))
+ if err != nil {
+ log.Print(err)
+ return 1
+ }
+ if keepstore != nil {
+ defer keepstore.Process.Kill()
+ }
+
api, err := arvadosclient.MakeArvadosClient()
if err != nil {
log.Printf("%s: %v", containerUUID, err)
@@ -1729,6 +1761,19 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
return 1
}
+ if keepstore != nil {
+ w, err := cr.NewLogWriter("keepstore")
+ if err != nil {
+ log.Print(err)
+ return 1
+ }
+ err = keepstoreLog.SetWriter(NewThrottledLogger(w))
+ if err != nil {
+ log.Print(err)
+ return 1
+ }
+ }
+
switch *runtimeEngine {
case "docker":
cr.executor, err = newDockerExecutor(containerUUID, cr.CrunchLog.Printf, cr.containerWatchdogInterval)
@@ -1816,21 +1861,73 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
return 0
}
-func loadEnv(rdr io.Reader) error {
- buf, err := ioutil.ReadAll(rdr)
+func startLocalKeepstore(configData ConfigData, logbuf io.Writer) (*exec.Cmd, error) {
+ if configData.Cluster == nil || configData.KeepBuffers < 1 {
+ return nil, nil
+ }
+
+ // Rather than have an alternate way to tell keepstore how
+ // many buffers to use when starting it this way, we just
+ // modify the cluster configuration that we feed it on stdin.
+ configData.Cluster.API.MaxKeepBlobBuffers = configData.KeepBuffers
+
+ ln, err := net.Listen("tcp", "localhost:0")
+ if err != nil {
+ return nil, err
+ }
+ _, port, err := net.SplitHostPort(ln.Addr().String())
+ if err != nil {
+ ln.Close()
+ return nil, err
+ }
+ ln.Close()
+ url := "http://localhost:" + port
+
+ fmt.Fprintf(logbuf, "starting keepstore on %s\n", url)
+
+ var confJSON bytes.Buffer
+ err = json.NewEncoder(&confJSON).Encode(arvados.Config{
+ Clusters: map[string]arvados.Cluster{
+ configData.Cluster.ClusterID: *configData.Cluster,
+ },
+ })
if err != nil {
- return fmt.Errorf("read stdin: %s", err)
+ return nil, err
+ }
+ cmd := exec.Command("/proc/self/exe", "keepstore", "-config=-")
+ cmd.Stdin = &confJSON
+ cmd.Stdout = logbuf
+ cmd.Stderr = logbuf
+ cmd.Env = []string{
+ "GOGC=10",
+ "ARVADOS_SERVICE_INTERNAL_URL=" + url,
}
- var env map[string]string
- err = json.Unmarshal(buf, &env)
+ err = cmd.Start()
if err != nil {
- return fmt.Errorf("decode stdin: %s", err)
+ return nil, fmt.Errorf("error starting keepstore process: %w", err)
}
- for k, v := range env {
- err = os.Setenv(k, v)
+ ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*10))
+ defer cancel()
+ poll := time.NewTicker(time.Second / 10)
+ defer poll.Stop()
+ client := http.Client{}
+ for range poll.C {
+ testReq, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
- return fmt.Errorf("setenv(%q): %s", k, err)
+ return nil, err
+ }
+ resp, err := client.Do(testReq)
+ if err == nil {
+ // Success -- don't need to check the
+ // response, we just need to know it's
+ // accepting requests.
+ resp.Body.Close()
+ break
+ }
+ if ctx.Err() != nil {
+ return nil, fmt.Errorf("timed out waiting for new keepstore process to accept a request")
}
}
- return nil
+ os.Setenv("ARVADOS_KEEP_SERVICES", url)
+ return cmd, nil
}
diff --git a/lib/dispatchcloud/test/stub_driver.go b/lib/dispatchcloud/test/stub_driver.go
index 1b31a71a2..f57db0f09 100644
--- a/lib/dispatchcloud/test/stub_driver.go
+++ b/lib/dispatchcloud/test/stub_driver.go
@@ -18,6 +18,7 @@ import (
"time"
"git.arvados.org/arvados.git/lib/cloud"
+ "git.arvados.org/arvados.git/lib/crunchrun"
"git.arvados.org/arvados.git/sdk/go/arvados"
"github.com/sirupsen/logrus"
"golang.org/x/crypto/ssh"
@@ -193,7 +194,7 @@ type StubVM struct {
ArvMountDeadlockRate float64
ExecuteContainer func(arvados.Container) int
CrashRunningContainer func(arvados.Container)
- ExtraCrunchRunArgs string // extra args expected after "crunch-run --detach --stdin-env "
+ ExtraCrunchRunArgs string // extra args expected after "crunch-run --detach --stdin-config "
sis *StubInstanceSet
id cloud.InstanceID
@@ -252,15 +253,15 @@ func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader,
fmt.Fprint(stderr, "crunch-run: command not found\n")
return 1
}
- if strings.HasPrefix(command, "crunch-run --detach --stdin-env "+svm.ExtraCrunchRunArgs) {
- var stdinKV map[string]string
- err := json.Unmarshal(stdinData, &stdinKV)
+ if strings.HasPrefix(command, "crunch-run --detach --stdin-config "+svm.ExtraCrunchRunArgs) {
+ var configData crunchrun.ConfigData
+ err := json.Unmarshal(stdinData, &configData)
if err != nil {
fmt.Fprintf(stderr, "unmarshal stdin: %s (stdin was: %q)\n", err, stdinData)
return 1
}
for _, name := range []string{"ARVADOS_API_HOST", "ARVADOS_API_TOKEN"} {
- if stdinKV[name] == "" {
+ if configData.Env[name] == "" {
fmt.Fprintf(stderr, "%s env var missing from stdin %q\n", name, stdinData)
return 1
}
diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go
index a5924cf99..37e3fa988 100644
--- a/lib/dispatchcloud/worker/pool.go
+++ b/lib/dispatchcloud/worker/pool.go
@@ -103,6 +103,7 @@ func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *promethe
instanceSetID: instanceSetID,
instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
newExecutor: newExecutor,
+ cluster: cluster,
bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand,
runnerSource: cluster.Containers.CloudVMs.DeployRunnerBinary,
imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
@@ -144,6 +145,7 @@ type Pool struct {
instanceSetID cloud.InstanceSetID
instanceSet *throttledInstanceSet
newExecutor func(cloud.Instance) Executor
+ cluster *arvados.Cluster
bootProbeCommand string
runnerSource string
imageID cloud.ImageID
diff --git a/lib/dispatchcloud/worker/pool_test.go b/lib/dispatchcloud/worker/pool_test.go
index 0f5c5ee19..7b5634605 100644
--- a/lib/dispatchcloud/worker/pool_test.go
+++ b/lib/dispatchcloud/worker/pool_test.go
@@ -10,10 +10,12 @@ import (
"time"
"git.arvados.org/arvados.git/lib/cloud"
+ "git.arvados.org/arvados.git/lib/config"
"git.arvados.org/arvados.git/lib/dispatchcloud/test"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
"github.com/prometheus/client_golang/prometheus"
+ "github.com/sirupsen/logrus"
check "gopkg.in/check.v1"
)
@@ -31,7 +33,18 @@ func (*lessChecker) Check(params []interface{}, names []string) (result bool, er
var less = &lessChecker{&check.CheckerInfo{Name: "less", Params: []string{"obtained", "expected"}}}
-type PoolSuite struct{}
+type PoolSuite struct {
+ logger logrus.FieldLogger
+ testCluster *arvados.Cluster
+}
+
+func (suite *PoolSuite) SetUpTest(c *check.C) {
+ suite.logger = ctxlog.TestLogger(c)
+ cfg, err := config.NewLoader(nil, suite.logger).Load()
+ c.Assert(err, check.IsNil)
+ suite.testCluster, err = cfg.GetCluster("")
+ c.Assert(err, check.IsNil)
+}
func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
type1 := test.InstanceType(1)
@@ -63,10 +76,9 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
}
}
- logger := ctxlog.TestLogger(c)
driver := &test.StubDriver{}
instanceSetID := cloud.InstanceSetID("test-instance-set-id")
- is, err := driver.InstanceSet(nil, instanceSetID, nil, logger)
+ is, err := driver.InstanceSet(nil, instanceSetID, nil, suite.logger)
c.Assert(err, check.IsNil)
newExecutor := func(cloud.Instance) Executor {
@@ -78,25 +90,21 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
}
}
- cluster := &arvados.Cluster{
- Containers: arvados.ContainersConfig{
- CloudVMs: arvados.CloudVMsConfig{
- BootProbeCommand: "true",
- MaxProbesPerSecond: 1000,
- ProbeInterval: arvados.Duration(time.Millisecond * 10),
- SyncInterval: arvados.Duration(time.Millisecond * 10),
- TagKeyPrefix: "testprefix:",
- },
- CrunchRunCommand: "crunch-run-custom",
- },
- InstanceTypes: arvados.InstanceTypeMap{
- type1.Name: type1,
- type2.Name: type2,
- type3.Name: type3,
- },
+ suite.testCluster.Containers.CloudVMs = arvados.CloudVMsConfig{
+ BootProbeCommand: "true",
+ MaxProbesPerSecond: 1000,
+ ProbeInterval: arvados.Duration(time.Millisecond * 10),
+ SyncInterval: arvados.Duration(time.Millisecond * 10),
+ TagKeyPrefix: "testprefix:",
+ }
+ suite.testCluster.Containers.CrunchRunCommand = "crunch-run-custom"
+ suite.testCluster.InstanceTypes = arvados.InstanceTypeMap{
+ type1.Name: type1,
+ type2.Name: type2,
+ type3.Name: type3,
}
- pool := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, cluster)
+ pool := NewPool(suite.logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, suite.testCluster)
notify := pool.Subscribe()
defer pool.Unsubscribe(notify)
pool.Create(type1)
@@ -111,7 +119,7 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
}
}
// Wait for the tags to save to the cloud provider
- tagKey := cluster.Containers.CloudVMs.TagKeyPrefix + tagKeyIdleBehavior
+ tagKey := suite.testCluster.Containers.CloudVMs.TagKeyPrefix + tagKeyIdleBehavior
deadline := time.Now().Add(time.Second)
for !func() bool {
pool.mtx.RLock()
@@ -132,7 +140,7 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
c.Log("------- starting new pool, waiting to recover state")
- pool2 := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, cluster)
+ pool2 := NewPool(suite.logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, suite.testCluster)
notify2 := pool2.Subscribe()
defer pool2.Unsubscribe(notify2)
waitForIdle(pool2, notify2)
@@ -148,9 +156,8 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
}
func (suite *PoolSuite) TestDrain(c *check.C) {
- logger := ctxlog.TestLogger(c)
driver := test.StubDriver{}
- instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, logger)
+ instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, suite.logger)
c.Assert(err, check.IsNil)
ac := arvados.NewClientFromEnv()
@@ -158,8 +165,9 @@ func (suite *PoolSuite) TestDrain(c *check.C) {
type1 := test.InstanceType(1)
pool := &Pool{
arvClient: ac,
- logger: logger,
+ logger: suite.logger,
newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
+ cluster: suite.testCluster,
instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
instanceTypes: arvados.InstanceTypeMap{
type1.Name: type1,
@@ -201,15 +209,15 @@ func (suite *PoolSuite) TestDrain(c *check.C) {
}
func (suite *PoolSuite) TestNodeCreateThrottle(c *check.C) {
- logger := ctxlog.TestLogger(c)
driver := test.StubDriver{HoldCloudOps: true}
- instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, logger)
+ instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, suite.logger)
c.Assert(err, check.IsNil)
type1 := test.InstanceType(1)
pool := &Pool{
- logger: logger,
+ logger: suite.logger,
instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
+ cluster: suite.testCluster,
maxConcurrentInstanceCreateOps: 1,
instanceTypes: arvados.InstanceTypeMap{
type1.Name: type1,
@@ -241,17 +249,17 @@ func (suite *PoolSuite) TestNodeCreateThrottle(c *check.C) {
}
func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
- logger := ctxlog.TestLogger(c)
driver := test.StubDriver{HoldCloudOps: true}
- instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, logger)
+ instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, suite.logger)
c.Assert(err, check.IsNil)
type1 := arvados.InstanceType{Name: "a1s", ProviderType: "a1.small", VCPUs: 1, RAM: 1 * GiB, Price: .01}
type2 := arvados.InstanceType{Name: "a2m", ProviderType: "a2.medium", VCPUs: 2, RAM: 2 * GiB, Price: .02}
type3 := arvados.InstanceType{Name: "a2l", ProviderType: "a2.large", VCPUs: 4, RAM: 4 * GiB, Price: .04}
pool := &Pool{
- logger: logger,
+ logger: suite.logger,
newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
+ cluster: suite.testCluster,
instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
instanceTypes: arvados.InstanceTypeMap{
type1.Name: type1,
diff --git a/lib/dispatchcloud/worker/runner.go b/lib/dispatchcloud/worker/runner.go
index 63561874c..29c4b8e0a 100644
--- a/lib/dispatchcloud/worker/runner.go
+++ b/lib/dispatchcloud/worker/runner.go
@@ -13,6 +13,7 @@ import (
"syscall"
"time"
+ "git.arvados.org/arvados.git/lib/crunchrun"
"github.com/sirupsen/logrus"
)
@@ -21,7 +22,7 @@ import (
type remoteRunner struct {
uuid string
executor Executor
- envJSON json.RawMessage
+ configJSON json.RawMessage
runnerCmd string
runnerArgs []string
remoteUser string
@@ -47,7 +48,8 @@ func newRemoteRunner(uuid string, wkr *worker) *remoteRunner {
if err := enc.Encode(wkr.instType); err != nil {
panic(err)
}
- env := map[string]string{
+ var configData crunchrun.ConfigData
+ configData.Env = map[string]string{
"ARVADOS_API_HOST": wkr.wp.arvClient.APIHost,
"ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
"InstanceType": instJSON.String(),
@@ -55,16 +57,20 @@ func newRemoteRunner(uuid string, wkr *worker) *remoteRunner {
"GatewayAuthSecret": wkr.wp.gatewayAuthSecret(uuid),
}
if wkr.wp.arvClient.Insecure {
- env["ARVADOS_API_HOST_INSECURE"] = "1"
+ configData.Env["ARVADOS_API_HOST_INSECURE"] = "1"
}
- envJSON, err := json.Marshal(env)
+ if bufs := wkr.wp.cluster.Containers.LocalKeepBlobBuffersPerVCPU; bufs > 0 {
+ configData.Cluster = wkr.wp.cluster
+ configData.KeepBuffers = bufs * wkr.instType.VCPUs
+ }
+ configJSON, err := json.Marshal(configData)
if err != nil {
panic(err)
}
rr := &remoteRunner{
uuid: uuid,
executor: wkr.executor,
- envJSON: envJSON,
+ configJSON: configJSON,
runnerCmd: wkr.wp.runnerCmd,
runnerArgs: wkr.wp.runnerArgs,
remoteUser: wkr.instance.RemoteUser(),
@@ -84,7 +90,7 @@ func newRemoteRunner(uuid string, wkr *worker) *remoteRunner {
// assume the remote process _might_ have started, at least until it
// probes the worker and finds otherwise.
func (rr *remoteRunner) Start() {
- cmd := rr.runnerCmd + " --detach --stdin-env"
+ cmd := rr.runnerCmd + " --detach --stdin-config"
for _, arg := range rr.runnerArgs {
cmd += " '" + strings.Replace(arg, "'", "'\\''", -1) + "'"
}
@@ -92,7 +98,7 @@ func (rr *remoteRunner) Start() {
if rr.remoteUser != "root" {
cmd = "sudo " + cmd
}
- stdin := bytes.NewBuffer(rr.envJSON)
+ stdin := bytes.NewBuffer(rr.configJSON)
stdout, stderr, err := rr.executor.Execute(nil, cmd, stdin)
if err != nil {
rr.logger.WithField("stdout", string(stdout)).
diff --git a/lib/dispatchcloud/worker/worker_test.go b/lib/dispatchcloud/worker/worker_test.go
index 4134788b2..2ee6b7c36 100644
--- a/lib/dispatchcloud/worker/worker_test.go
+++ b/lib/dispatchcloud/worker/worker_test.go
@@ -14,24 +14,36 @@ import (
"time"
"git.arvados.org/arvados.git/lib/cloud"
+ "git.arvados.org/arvados.git/lib/config"
"git.arvados.org/arvados.git/lib/dispatchcloud/test"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
"github.com/prometheus/client_golang/prometheus"
+ "github.com/sirupsen/logrus"
check "gopkg.in/check.v1"
)
var _ = check.Suite(&WorkerSuite{})
-type WorkerSuite struct{}
+type WorkerSuite struct {
+ logger logrus.FieldLogger
+ testCluster *arvados.Cluster
+}
+
+func (suite *WorkerSuite) SetUpTest(c *check.C) {
+ suite.logger = ctxlog.TestLogger(c)
+ cfg, err := config.NewLoader(nil, suite.logger).Load()
+ c.Assert(err, check.IsNil)
+ suite.testCluster, err = cfg.GetCluster("")
+ c.Assert(err, check.IsNil)
+}
func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) {
- logger := ctxlog.TestLogger(c)
bootTimeout := time.Minute
probeTimeout := time.Second
ac := arvados.NewClientFromEnv()
- is, err := (&test.StubDriver{}).InstanceSet(nil, "test-instance-set-id", nil, logger)
+ is, err := (&test.StubDriver{}).InstanceSet(nil, "test-instance-set-id", nil, suite.logger)
c.Assert(err, check.IsNil)
inst, err := is.Create(arvados.InstanceType{}, "", nil, "echo InitCommand", nil)
c.Assert(err, check.IsNil)
@@ -232,6 +244,7 @@ func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) {
wp := &Pool{
arvClient: ac,
newExecutor: func(cloud.Instance) Executor { return exr },
+ cluster: suite.testCluster,
bootProbeCommand: "bootprobe",
timeoutBooting: bootTimeout,
timeoutProbe: probeTimeout,
@@ -249,7 +262,7 @@ func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) {
exr.response[wp.runnerCmd+" --list"] = trial.respRunDeployed
}
wkr := &worker{
- logger: logger,
+ logger: suite.logger,
executor: exr,
wp: wp,
mtx: &wp.mtx,
diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go
index f1d27b8dc..b84e1eefa 100644
--- a/sdk/go/arvados/config.go
+++ b/sdk/go/arvados/config.go
@@ -435,6 +435,7 @@ type ContainersConfig struct {
SupportedDockerImageFormats StringSet
UsePreemptibleInstances bool
RuntimeEngine string
+ LocalKeepBlobBuffersPerVCPU int
JobsAPI struct {
Enable string
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list