[ARVADOS] created: 1.3.0-2009-gdd9367afe
Git user
git at public.arvados.org
Mon Dec 30 17:03:29 UTC 2019
at dd9367afefff5d0cd38d1549e32e2794e4614fb4 (commit)
commit dd9367afefff5d0cd38d1549e32e2794e4614fb4
Author: Tom Clegg <tom at tomclegg.ca>
Date: Mon Dec 30 11:25:23 2019 -0500
15759: Improve logs.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at tomclegg.ca>
diff --git a/lib/dispatchcloud/worker/worker.go b/lib/dispatchcloud/worker/worker.go
index ed8f06394..357ac20a0 100644
--- a/lib/dispatchcloud/worker/worker.go
+++ b/lib/dispatchcloud/worker/worker.go
@@ -369,7 +369,6 @@ func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
wkr.logger.WithError(err).WithField("stderr", string(stderr2)).Warn("error copying runner binary")
return false, stderr2
} else {
- wkr.logger.Info("runner binary OK")
stderr = append(stderr, stderr2...)
}
return true, stderr
@@ -378,9 +377,14 @@ func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) {
hash := fmt.Sprintf("%x", wkr.wp.runnerMD5)
dstdir, _ := filepath.Split(wkr.wp.runnerCmd)
+ logger := wkr.logger.WithFields(logrus.Fields{
+ "hash": hash,
+ "path": wkr.wp.runnerCmd,
+ })
stdout, stderr, err = wkr.executor.Execute(nil, `md5sum `+wkr.wp.runnerCmd, nil)
if err == nil && len(stderr) == 0 && bytes.Equal(stdout, []byte(hash+" "+wkr.wp.runnerCmd+"\n")) {
+ logger.Info("runner binary already exists on worker, with correct hash")
return
}
@@ -391,6 +395,7 @@ func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) {
if wkr.instance.RemoteUser() != "root" {
cmd = `sudo sh -c '` + strings.Replace(cmd, "'", "'\\''", -1) + `'`
}
+ logger.WithField("cmd", cmd).Info("installing runner binary on worker")
stdout, stderr, err = wkr.executor.Execute(nil, cmd, bytes.NewReader(wkr.wp.runnerData))
return
}
commit 08ee2ab7699e7fee77f6c428977762cb12287166
Author: Tom Clegg <tom at tomclegg.ca>
Date: Mon Dec 30 11:09:13 2019 -0500
15759: Update file mode; avoid /var/run, typically mounted noexec.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at tomclegg.ca>
diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go
index 34bc26dea..c52422c41 100644
--- a/lib/dispatchcloud/worker/pool.go
+++ b/lib/dispatchcloud/worker/pool.go
@@ -790,7 +790,7 @@ func (wp *Pool) loadRunnerData() error {
}
wp.runnerData = buf
wp.runnerMD5 = md5.Sum(buf)
- wp.runnerCmd = fmt.Sprintf("/var/run/arvados/crunch-run~%x", wp.runnerMD5)
+ wp.runnerCmd = fmt.Sprintf("/var/lib/arvados/crunch-run~%x", wp.runnerMD5)
return nil
}
diff --git a/lib/dispatchcloud/worker/worker.go b/lib/dispatchcloud/worker/worker.go
index a455a7d06..ed8f06394 100644
--- a/lib/dispatchcloud/worker/worker.go
+++ b/lib/dispatchcloud/worker/worker.go
@@ -387,7 +387,7 @@ func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) {
// Note touch+chmod come before writing data, to avoid the
// possibility of md5 being correct while file mode is
// incorrect.
- cmd := `set -e; dstdir="` + dstdir + `"; dstfile="` + wkr.wp.runnerCmd + `"; mkdir -p "$dstdir"; touch "$dstfile"; chmod 0700 "$dstfile"; cat >"$dstfile"`
+ cmd := `set -e; dstdir="` + dstdir + `"; dstfile="` + wkr.wp.runnerCmd + `"; mkdir -p "$dstdir"; touch "$dstfile"; chmod 0755 "$dstdir" "$dstfile"; cat >"$dstfile"`
if wkr.instance.RemoteUser() != "root" {
cmd = `sudo sh -c '` + strings.Replace(cmd, "'", "'\\''", -1) + `'`
}
commit c347c99a6ffe9d72f6decf29d2553bc6caf126c2
Author: Tom Clegg <tom at tomclegg.ca>
Date: Mon Dec 30 10:55:38 2019 -0500
15759: Add DeployRunnerBinary to config.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at tomclegg.ca>
diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go
index 6bc269c7e..34bc26dea 100644
--- a/lib/dispatchcloud/worker/pool.go
+++ b/lib/dispatchcloud/worker/pool.go
@@ -102,6 +102,7 @@ func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *promethe
instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
newExecutor: newExecutor,
bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand,
+ runnerSource: cluster.Containers.CloudVMs.DeployRunnerBinary,
imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
instanceTypes: cluster.InstanceTypes,
maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond,
diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go
index 433e361e1..ca39d6b26 100644
--- a/sdk/go/arvados/config.go
+++ b/sdk/go/arvados/config.go
@@ -390,6 +390,7 @@ type CloudVMsConfig struct {
Enable bool
BootProbeCommand string
+ DeployRunnerBinary string
ImageID string
MaxCloudOpsPerSecond int
MaxProbesPerSecond int
commit a3c7d9e03062e3246b0857fbae05f45d22e39169
Merge: da3cd46f9 949abe8b7
Author: Tom Clegg <tom at tomclegg.ca>
Date: Mon Dec 30 10:18:15 2019 -0500
15759: Merge branch 'master' into 15759-deploy-crunch-run
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at tomclegg.ca>
commit da3cd46f9204f522b73023b42e8a4d04f697e9c7
Author: Tom Clegg <tom at tomclegg.ca>
Date: Mon Dec 30 10:10:30 2019 -0500
15759: Deploy supervisor binary from dispatcher to worker VMs.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at tomclegg.ca>
diff --git a/lib/cmd/cmd.go b/lib/cmd/cmd.go
index 51bcf55c7..611c95d23 100644
--- a/lib/cmd/cmd.go
+++ b/lib/cmd/cmd.go
@@ -65,6 +65,11 @@ type Multi map[string]Handler
func (m Multi) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
_, basename := filepath.Split(prog)
+ if i := strings.Index(basename, "~"); i >= 0 {
+ // drop "~anything" suffix (arvados-dispatch-cloud's
+ // DeployRunnerBinary feature relies on this)
+ basename = basename[:i]
+ }
cmd, ok := m[basename]
if !ok {
// "controller" command exists, and binary is named "arvados-controller"
diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml
index 7779d7ebf..ccc6343e6 100644
--- a/lib/config/config.default.yml
+++ b/lib/config/config.default.yml
@@ -788,6 +788,16 @@ Clusters:
# Worker VM image ID.
ImageID: ""
+ # An executable file (located on the dispatcher host) to be
+ # copied to cloud instances at runtime and used as the
+ # container runner/supervisor. The default value is the
+ # dispatcher program itself.
+ #
+ # Use the empty string to disable this step: nothing will be
+ # copied, and cloud instances are assumed to have a suitable
+ # version of crunch-run installed.
+ DeployRunnerBinary: "/proc/self/exe"
+
# Tags to add on all resources (VMs, NICs, disks) created by
# the container dispatcher. (Arvados's own tags --
# InstanceType, IdleBehavior, and InstanceSecret -- will also
diff --git a/lib/config/generated_config.go b/lib/config/generated_config.go
index 3e58d249b..3d4c18e28 100644
--- a/lib/config/generated_config.go
+++ b/lib/config/generated_config.go
@@ -794,6 +794,16 @@ Clusters:
# Worker VM image ID.
ImageID: ""
+ # An executable file (located on the dispatcher host) to be
+ # copied to cloud instances at runtime and used as the
+ # container runner/supervisor. The default value is the
+ # dispatcher program itself.
+ #
+ # Use the empty string to disable this step: nothing will be
+ # copied, and cloud instances are assumed to have a suitable
+ # version of crunch-run installed.
+ DeployRunnerBinary: "/proc/self/exe"
+
# Tags to add on all resources (VMs, NICs, disks) created by
# the container dispatcher. (Arvados's own tags --
# InstanceType, IdleBehavior, and InstanceSecret -- will also
diff --git a/lib/dispatchcloud/dispatcher.go b/lib/dispatchcloud/dispatcher.go
index ce7dc0730..19c8f6e0b 100644
--- a/lib/dispatchcloud/dispatcher.go
+++ b/lib/dispatchcloud/dispatcher.go
@@ -37,6 +37,7 @@ const (
type pool interface {
scheduler.WorkerPool
+ CheckHealth() error
Instances() []worker.InstanceView
SetIdleBehavior(cloud.InstanceID, worker.IdleBehavior) error
KillInstance(id cloud.InstanceID, reason string) error
@@ -78,7 +79,7 @@ func (disp *dispatcher) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// CheckHealth implements service.Handler.
func (disp *dispatcher) CheckHealth() error {
disp.Start()
- return nil
+ return disp.pool.CheckHealth()
}
// Stop dispatching containers and release resources. Typically used
diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go
index 649910f63..6bc269c7e 100644
--- a/lib/dispatchcloud/worker/pool.go
+++ b/lib/dispatchcloud/worker/pool.go
@@ -5,10 +5,12 @@
package worker
import (
+ "crypto/md5"
"crypto/rand"
"errors"
"fmt"
"io"
+ "io/ioutil"
"sort"
"strings"
"sync"
@@ -135,6 +137,7 @@ type Pool struct {
instanceSet *throttledInstanceSet
newExecutor func(cloud.Instance) Executor
bootProbeCommand string
+ runnerSource string
imageID cloud.ImageID
instanceTypes map[string]arvados.InstanceType
syncInterval time.Duration
@@ -160,6 +163,9 @@ type Pool struct {
stop chan bool
mtx sync.RWMutex
setupOnce sync.Once
+ runnerData []byte
+ runnerMD5 [md5.Size]byte
+ runnerCmd string
throttleCreate throttle
throttleInstances throttle
@@ -177,6 +183,14 @@ type createCall struct {
instanceType arvados.InstanceType
}
+func (wp *Pool) CheckHealth() error {
+ wp.setupOnce.Do(wp.setup)
+ if err := wp.loadRunnerData(); err != nil {
+ return fmt.Errorf("error loading runner binary: %s", err)
+ }
+ return nil
+}
+
// Subscribe returns a buffered channel that becomes ready after any
// change to the pool's state that could have scheduling implications:
// a worker's state changes, a new worker appears, the cloud
@@ -276,6 +290,10 @@ func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
func (wp *Pool) Create(it arvados.InstanceType) bool {
logger := wp.logger.WithField("InstanceType", it.Name)
wp.setupOnce.Do(wp.setup)
+ if wp.loadRunnerData() != nil {
+ // Boot probe is certain to fail.
+ return false
+ }
wp.mtx.Lock()
defer wp.mtx.Unlock()
if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
@@ -743,6 +761,36 @@ func (wp *Pool) setup() {
wp.exited = map[string]time.Time{}
wp.workers = map[cloud.InstanceID]*worker{}
wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
+ wp.loadRunnerData()
+}
+
+// Load the runner program to be deployed on worker nodes into
+// wp.runnerData, if necessary. Errors are logged.
+//
+// If auto-deploy is disabled, len(wp.runnerData) will be 0.
+//
+// Caller must not have lock.
+func (wp *Pool) loadRunnerData() error {
+ wp.mtx.Lock()
+ defer wp.mtx.Unlock()
+ if wp.runnerData != nil {
+ return nil
+ } else if wp.runnerSource == "" {
+ wp.runnerCmd = "crunch-run"
+ wp.runnerData = []byte{}
+ return nil
+ }
+ logger := wp.logger.WithField("source", wp.runnerSource)
+ logger.Debug("loading runner")
+ buf, err := ioutil.ReadFile(wp.runnerSource)
+ if err != nil {
+ logger.WithError(err).Error("failed to load runner program")
+ return err
+ }
+ wp.runnerData = buf
+ wp.runnerMD5 = md5.Sum(buf)
+ wp.runnerCmd = fmt.Sprintf("/var/run/arvados/crunch-run~%x", wp.runnerMD5)
+ return nil
}
func (wp *Pool) notify() {
diff --git a/lib/dispatchcloud/worker/pool_test.go b/lib/dispatchcloud/worker/pool_test.go
index 8b2415b40..1948c1e87 100644
--- a/lib/dispatchcloud/worker/pool_test.go
+++ b/lib/dispatchcloud/worker/pool_test.go
@@ -70,9 +70,11 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
c.Assert(err, check.IsNil)
newExecutor := func(cloud.Instance) Executor {
- return stubExecutor{
- "crunch-run --list": stubResp{},
- "true": stubResp{},
+ return &stubExecutor{
+ response: map[string]stubResp{
+ "crunch-run --list": stubResp{},
+ "true": stubResp{},
+ },
}
}
@@ -155,7 +157,7 @@ func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
type3 := arvados.InstanceType{Name: "a2l", ProviderType: "a2.large", VCPUs: 4, RAM: 4 * GiB, Price: .04}
pool := &Pool{
logger: logger,
- newExecutor: func(cloud.Instance) Executor { return stubExecutor{} },
+ newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
instanceTypes: arvados.InstanceTypeMap{
type1.Name: type1,
diff --git a/lib/dispatchcloud/worker/runner.go b/lib/dispatchcloud/worker/runner.go
index e819a6036..475212134 100644
--- a/lib/dispatchcloud/worker/runner.go
+++ b/lib/dispatchcloud/worker/runner.go
@@ -20,6 +20,7 @@ type remoteRunner struct {
uuid string
executor Executor
envJSON json.RawMessage
+ runnerCmd string
remoteUser string
timeoutTERM time.Duration
timeoutSignal time.Duration
@@ -59,6 +60,7 @@ func newRemoteRunner(uuid string, wkr *worker) *remoteRunner {
uuid: uuid,
executor: wkr.executor,
envJSON: envJSON,
+ runnerCmd: wkr.wp.runnerCmd,
remoteUser: wkr.instance.RemoteUser(),
timeoutTERM: wkr.wp.timeoutTERM,
timeoutSignal: wkr.wp.timeoutSignal,
@@ -76,7 +78,7 @@ func newRemoteRunner(uuid string, wkr *worker) *remoteRunner {
// assume the remote process _might_ have started, at least until it
// probes the worker and finds otherwise.
func (rr *remoteRunner) Start() {
- cmd := "crunch-run --detach --stdin-env '" + rr.uuid + "'"
+ cmd := rr.runnerCmd + " --detach --stdin-env '" + rr.uuid + "'"
if rr.remoteUser != "root" {
cmd = "sudo " + cmd
}
@@ -136,7 +138,7 @@ func (rr *remoteRunner) Kill(reason string) {
func (rr *remoteRunner) kill(sig syscall.Signal) {
logger := rr.logger.WithField("Signal", int(sig))
logger.Info("sending signal")
- cmd := fmt.Sprintf("crunch-run --kill %d %s", sig, rr.uuid)
+ cmd := fmt.Sprintf(rr.runnerCmd+" --kill %d %s", sig, rr.uuid)
if rr.remoteUser != "root" {
cmd = "sudo " + cmd
}
diff --git a/lib/dispatchcloud/worker/worker.go b/lib/dispatchcloud/worker/worker.go
index cda5a14b1..a455a7d06 100644
--- a/lib/dispatchcloud/worker/worker.go
+++ b/lib/dispatchcloud/worker/worker.go
@@ -5,7 +5,9 @@
package worker
import (
+ "bytes"
"fmt"
+ "path/filepath"
"strings"
"sync"
"time"
@@ -318,7 +320,7 @@ func (wkr *worker) probeAndUpdate() {
}
func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
- cmd := "crunch-run --list"
+ cmd := wkr.wp.runnerCmd + " --list"
if u := wkr.instance.RemoteUser(); u != "root" {
cmd = "sudo " + cmd
}
@@ -358,9 +360,41 @@ func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
return false, stderr
}
logger.Info("boot probe succeeded")
+ if err = wkr.wp.loadRunnerData(); err != nil {
+ wkr.logger.WithError(err).Warn("cannot boot worker: error loading runner binary")
+ return false, stderr
+ } else if len(wkr.wp.runnerData) == 0 {
+ // Assume crunch-run is already installed
+ } else if _, stderr2, err := wkr.copyRunnerData(); err != nil {
+ wkr.logger.WithError(err).WithField("stderr", string(stderr2)).Warn("error copying runner binary")
+ return false, stderr2
+ } else {
+ wkr.logger.Info("runner binary OK")
+ stderr = append(stderr, stderr2...)
+ }
return true, stderr
}
+func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) {
+ hash := fmt.Sprintf("%x", wkr.wp.runnerMD5)
+ dstdir, _ := filepath.Split(wkr.wp.runnerCmd)
+
+ stdout, stderr, err = wkr.executor.Execute(nil, `md5sum `+wkr.wp.runnerCmd, nil)
+ if err == nil && len(stderr) == 0 && bytes.Equal(stdout, []byte(hash+" "+wkr.wp.runnerCmd+"\n")) {
+ return
+ }
+
+ // Note touch+chmod come before writing data, to avoid the
+ // possibility of md5 being correct while file mode is
+ // incorrect.
+ cmd := `set -e; dstdir="` + dstdir + `"; dstfile="` + wkr.wp.runnerCmd + `"; mkdir -p "$dstdir"; touch "$dstfile"; chmod 0700 "$dstfile"; cat >"$dstfile"`
+ if wkr.instance.RemoteUser() != "root" {
+ cmd = `sudo sh -c '` + strings.Replace(cmd, "'", "'\\''", -1) + `'`
+ }
+ stdout, stderr, err = wkr.executor.Execute(nil, cmd, bytes.NewReader(wkr.wp.runnerData))
+ return
+}
+
// caller must have lock.
func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
if wkr.idleBehavior == IdleBehaviorHold {
diff --git a/lib/dispatchcloud/worker/worker_test.go b/lib/dispatchcloud/worker/worker_test.go
index 4f8f77bff..a4c2a6370 100644
--- a/lib/dispatchcloud/worker/worker_test.go
+++ b/lib/dispatchcloud/worker/worker_test.go
@@ -5,8 +5,12 @@
package worker
import (
+ "bytes"
+ "crypto/md5"
"errors"
+ "fmt"
"io"
+ "strings"
"time"
"git.arvados.org/arvados.git/lib/cloud"
@@ -38,7 +42,11 @@ func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) {
running int
starting int
respBoot stubResp // zero value is success
+ respDeploy stubResp // zero value is success
respRun stubResp // zero value is success + nothing running
+ respRunDeployed stubResp
+ deployRunner []byte
+ expectStdin []byte
expectState State
expectRunning int
}
@@ -46,7 +54,7 @@ func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) {
errFail := errors.New("failed")
respFail := stubResp{"", "command failed\n", errFail}
respContainerRunning := stubResp{"zzzzz-dz642-abcdefghijklmno\n", "", nil}
- for _, trial := range []trialT{
+ for idx, trial := range []trialT{
{
testCaseComment: "Unknown, probes fail",
state: StateUnknown,
@@ -185,12 +193,40 @@ func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) {
starting: 1,
expectState: StateRunning,
},
+ {
+ testCaseComment: "Booting, boot probe succeeds, deployRunner succeeds, run probe succeeds",
+ state: StateBooting,
+ deployRunner: []byte("ELF"),
+ expectStdin: []byte("ELF"),
+ respRun: respFail,
+ respRunDeployed: respContainerRunning,
+ expectRunning: 1,
+ expectState: StateRunning,
+ },
+ {
+ testCaseComment: "Booting, boot probe succeeds, deployRunner fails",
+ state: StateBooting,
+ deployRunner: []byte("ELF"),
+ respDeploy: respFail,
+ expectStdin: []byte("ELF"),
+ expectState: StateBooting,
+ },
+ {
+ testCaseComment: "Booting, boot probe succeeds, deployRunner skipped, run probe succeeds",
+ state: StateBooting,
+ deployRunner: nil,
+ respDeploy: respFail,
+ expectState: StateIdle,
+ },
} {
- c.Logf("------- %#v", trial)
+ c.Logf("------- trial %d: %#v", idx, trial)
ctime := time.Now().Add(-trial.age)
- exr := stubExecutor{
- "bootprobe": trial.respBoot,
- "crunch-run --list": trial.respRun,
+ exr := &stubExecutor{
+ response: map[string]stubResp{
+ "bootprobe": trial.respBoot,
+ "crunch-run --list": trial.respRun,
+ "{deploy}": trial.respDeploy,
+ },
}
wp := &Pool{
arvClient: ac,
@@ -199,6 +235,14 @@ func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) {
timeoutBooting: bootTimeout,
timeoutProbe: probeTimeout,
exited: map[string]time.Time{},
+ runnerCmd: "crunch-run",
+ runnerData: trial.deployRunner,
+ runnerMD5: md5.Sum(trial.deployRunner),
+ }
+ if trial.deployRunner != nil {
+ svHash := md5.Sum(trial.deployRunner)
+ wp.runnerCmd = fmt.Sprintf("/var/run/arvados/crunch-run~%x", svHash)
+ exr.response[wp.runnerCmd+" --list"] = trial.respRunDeployed
}
wkr := &worker{
logger: logger,
@@ -226,6 +270,7 @@ func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) {
wkr.probeAndUpdate()
c.Check(wkr.state, check.Equals, trial.expectState)
c.Check(len(wkr.running), check.Equals, trial.expectRunning)
+ c.Check(exr.stdin.String(), check.Equals, string(trial.expectStdin))
}
}
@@ -234,14 +279,27 @@ type stubResp struct {
stderr string
err error
}
-type stubExecutor map[string]stubResp
-func (se stubExecutor) SetTarget(cloud.ExecutorTarget) {}
-func (se stubExecutor) Close() {}
-func (se stubExecutor) Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error) {
- resp, ok := se[cmd]
+type stubExecutor struct {
+ response map[string]stubResp
+ stdin bytes.Buffer
+}
+
+func (se *stubExecutor) SetTarget(cloud.ExecutorTarget) {}
+func (se *stubExecutor) Close() {}
+func (se *stubExecutor) Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error) {
+ if stdin != nil {
+ _, err = io.Copy(&se.stdin, stdin)
+ if err != nil {
+ return nil, []byte(err.Error()), err
+ }
+ }
+ resp, ok := se.response[cmd]
+ if !ok && strings.Contains(cmd, `; cat >"$dstfile"`) {
+ resp, ok = se.response["{deploy}"]
+ }
if !ok {
- return nil, []byte("command not found\n"), errors.New("command not found")
+ return nil, []byte(fmt.Sprintf("%s: command not found\n", cmd)), errors.New("command not found")
}
return []byte(resp.stdout), []byte(resp.stderr), resp.err
}
commit 0c1c09cfd0a28bcc591f4cd5e2175c63091e4e83
Author: Tom Clegg <tom at tomclegg.ca>
Date: Wed Dec 18 11:55:32 2019 -0500
15759: Move crunch-run into arvados-server cmd.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at tomclegg.ca>
diff --git a/build/run-build-packages.sh b/build/run-build-packages.sh
index d6c8f5ac6..3173aa705 100755
--- a/build/run-build-packages.sh
+++ b/build/run-build-packages.sh
@@ -294,7 +294,7 @@ package_go_binary services/crunch-dispatch-local crunch-dispatch-local \
"Dispatch Crunch containers on the local system"
package_go_binary services/crunch-dispatch-slurm crunch-dispatch-slurm \
"Dispatch Crunch containers to a SLURM cluster"
-package_go_binary services/crunch-run crunch-run \
+package_go_binary cmd/arvados-server crunch-run \
"Supervise a single Crunch container"
package_go_binary services/crunchstat crunchstat \
"Gather cpu/memory/network statistics of running Crunch jobs"
diff --git a/build/run-tests.sh b/build/run-tests.sh
index 02a6a4044..f21861762 100755
--- a/build/run-tests.sh
+++ b/build/run-tests.sh
@@ -81,6 +81,7 @@ lib/controller/railsproxy
lib/controller/router
lib/controller/rpc
lib/crunchstat
+lib/crunch-run
lib/cloud
lib/cloud/azure
lib/cloud/cloudtest
@@ -104,7 +105,6 @@ services/keep-balance
services/login-sync
services/nodemanager
services/nodemanager_integration
-services/crunch-run
services/crunch-dispatch-local
services/crunch-dispatch-slurm
services/ws
diff --git a/cmd/arvados-server/cmd.go b/cmd/arvados-server/cmd.go
index fe6689862..d93a8e78f 100644
--- a/cmd/arvados-server/cmd.go
+++ b/cmd/arvados-server/cmd.go
@@ -11,6 +11,7 @@ import (
"git.arvados.org/arvados.git/lib/cmd"
"git.arvados.org/arvados.git/lib/config"
"git.arvados.org/arvados.git/lib/controller"
+ "git.arvados.org/arvados.git/lib/crunchrun"
"git.arvados.org/arvados.git/lib/dispatchcloud"
)
@@ -25,6 +26,7 @@ var (
"config-dump": config.DumpCommand,
"config-defaults": config.DumpDefaultsCommand,
"controller": controller.Command,
+ "crunch-run": crunchrun.Command,
"dispatch-cloud": dispatchcloud.Command,
})
)
diff --git a/lib/cmd/cmd.go b/lib/cmd/cmd.go
index 24b69f0cc..51bcf55c7 100644
--- a/lib/cmd/cmd.go
+++ b/lib/cmd/cmd.go
@@ -37,6 +37,10 @@ var version = "dev"
type versionCommand struct{}
+func (versionCommand) String() string {
+ return fmt.Sprintf("%s (%s)", version, runtime.Version())
+}
+
func (versionCommand) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
prog = regexp.MustCompile(` -*version$`).ReplaceAllLiteralString(prog, "")
fmt.Fprintf(stdout, "%s %s (%s)\n", prog, version, runtime.Version())
@@ -61,9 +65,16 @@ type Multi map[string]Handler
func (m Multi) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
_, basename := filepath.Split(prog)
- basename = strings.TrimPrefix(basename, "arvados-")
- basename = strings.TrimPrefix(basename, "crunch-")
- if cmd, ok := m[basename]; ok {
+ cmd, ok := m[basename]
+ if !ok {
+ // "controller" command exists, and binary is named "arvados-controller"
+ cmd, ok = m[strings.TrimPrefix(basename, "arvados-")]
+ }
+ if !ok {
+ // "dispatch-slurm" command exists, and binary is named "crunch-dispatch-slurm"
+ cmd, ok = m[strings.TrimPrefix(basename, "crunch-")]
+ }
+ if ok {
return cmd.RunCommand(prog, args, stdin, stdout, stderr)
} else if len(args) < 1 {
fmt.Fprintf(stderr, "usage: %s command [args]\n", prog)
diff --git a/services/crunch-run/background.go b/lib/crunchrun/background.go
similarity index 82%
rename from services/crunch-run/background.go
rename to lib/crunchrun/background.go
index 852ccb6ec..bf039afa0 100644
--- a/services/crunch-run/background.go
+++ b/lib/crunchrun/background.go
@@ -2,7 +2,7 @@
//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package crunchrun
import (
"encoding/json"
@@ -36,10 +36,10 @@ type procinfo struct {
//
// Stdout and stderr in the child process are sent to the systemd
// journal using the systemd-cat program.
-func Detach(uuid string, args []string, stdout, stderr io.Writer) int {
- return exitcode(stderr, detach(uuid, args, stdout, stderr))
+func Detach(uuid string, prog string, args []string, stdout, stderr io.Writer) int {
+ return exitcode(stderr, detach(uuid, prog, args, stdout, stderr))
}
-func detach(uuid string, args []string, stdout, stderr io.Writer) error {
+func detach(uuid string, prog string, args []string, stdout, stderr io.Writer) error {
lockfile, err := func() (*os.File, error) {
// We must hold the dir-level lock between
// opening/creating the lockfile and acquiring LOCK_EX
@@ -68,7 +68,31 @@ func detach(uuid string, args []string, stdout, stderr io.Writer) error {
defer lockfile.Close()
lockfile.Truncate(0)
- cmd := exec.Command("systemd-cat", append([]string{"--identifier=crunch-run", args[0], "-no-detach"}, args[1:]...)...)
+ execargs := append([]string{"-no-detach"}, args...)
+ if strings.HasSuffix(prog, " crunch-run") {
+ // invoked as "/path/to/arvados-server crunch-run"
+ // (see arvados/lib/cmd.Multi)
+ execargs = append([]string{strings.TrimSuffix(prog, " crunch-run"), "crunch-run"}, execargs...)
+ } else {
+ // invoked as "/path/to/crunch-run"
+ execargs = append([]string{prog}, execargs...)
+ }
+ execargs = append([]string{
+ // Here, if the inner systemd-cat can't exec
+ // crunch-run, it writes an error message to stderr,
+ // and the outer systemd-cat writes it to the journal
+ // where the operator has a chance to discover it. (If
+ // we only used one systemd-cat command, it would be
+ // up to us to report the error -- but we are going to
+ // detach and exit, not wait for something to appear
+ // on stderr.) Note these systemd-cat calls don't
+ // result in additional processes -- they just connect
+ // stderr/stdout to sockets and call exec().
+ "systemd-cat", "--identifier=crunch-run",
+ "systemd-cat", "--identifier=crunch-run",
+ }, execargs...)
+
+ cmd := exec.Command(execargs[0], execargs[1:]...)
// Child inherits lockfile.
cmd.ExtraFiles = []*os.File{lockfile}
// Ensure child isn't interrupted even if we receive signals
diff --git a/services/crunch-run/cgroup.go b/lib/crunchrun/cgroup.go
similarity index 97%
rename from services/crunch-run/cgroup.go
rename to lib/crunchrun/cgroup.go
index 9e52de52a..0b254f5bd 100644
--- a/services/crunch-run/cgroup.go
+++ b/lib/crunchrun/cgroup.go
@@ -2,7 +2,7 @@
//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package crunchrun
import (
"bytes"
diff --git a/services/crunch-run/cgroup_test.go b/lib/crunchrun/cgroup_test.go
similarity index 95%
rename from services/crunch-run/cgroup_test.go
rename to lib/crunchrun/cgroup_test.go
index 95adf7484..b43479a3b 100644
--- a/services/crunch-run/cgroup_test.go
+++ b/lib/crunchrun/cgroup_test.go
@@ -2,7 +2,7 @@
//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package crunchrun
import (
. "gopkg.in/check.v1"
diff --git a/services/crunch-run/copier.go b/lib/crunchrun/copier.go
similarity index 99%
rename from services/crunch-run/copier.go
rename to lib/crunchrun/copier.go
index e5711cc73..b1497277f 100644
--- a/services/crunch-run/copier.go
+++ b/lib/crunchrun/copier.go
@@ -2,7 +2,7 @@
//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package crunchrun
import (
"encoding/json"
diff --git a/services/crunch-run/copier_test.go b/lib/crunchrun/copier_test.go
similarity index 99%
rename from services/crunch-run/copier_test.go
rename to lib/crunchrun/copier_test.go
index d767bc463..777b715d7 100644
--- a/services/crunch-run/copier_test.go
+++ b/lib/crunchrun/copier_test.go
@@ -2,7 +2,7 @@
//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package crunchrun
import (
"io"
diff --git a/services/crunch-run/crunchrun.go b/lib/crunchrun/crunchrun.go
similarity index 95%
rename from services/crunch-run/crunchrun.go
rename to lib/crunchrun/crunchrun.go
index 0a19d17f4..b0a4007f7 100644
--- a/services/crunch-run/crunchrun.go
+++ b/lib/crunchrun/crunchrun.go
@@ -2,7 +2,7 @@
//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package crunchrun
import (
"bytes"
@@ -27,6 +27,7 @@ import (
"syscall"
"time"
+ "git.arvados.org/arvados.git/lib/cmd"
"git.arvados.org/arvados.git/lib/crunchstat"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
@@ -40,7 +41,9 @@ import (
dockerclient "github.com/docker/docker/client"
)
-var version = "dev"
+type command struct{}
+
+var Command = command{}
// IArvadosClient is the minimal Arvados API methods used by crunch-run.
type IArvadosClient interface {
@@ -1527,7 +1530,7 @@ func (runner *ContainerRunner) NewArvLogWriter(name string) (io.WriteCloser, err
// Run the full container lifecycle.
func (runner *ContainerRunner) Run() (err error) {
- runner.CrunchLog.Printf("crunch-run %s started", version)
+ runner.CrunchLog.Printf("crunch-run %s started", cmd.Version.String())
runner.CrunchLog.Printf("Executing container '%s'", runner.Container.UUID)
hostname, hosterr := os.Hostname()
@@ -1758,83 +1761,93 @@ func NewContainerRunner(dispatcherClient *arvados.Client,
return cr, nil
}
-func main() {
- statInterval := flag.Duration("crunchstat-interval", 10*time.Second, "sampling period for periodic resource usage reporting")
- cgroupRoot := flag.String("cgroup-root", "/sys/fs/cgroup", "path to sysfs cgroup tree")
- cgroupParent := flag.String("cgroup-parent", "docker", "name of container's parent cgroup (ignored if -cgroup-parent-subsystem is used)")
- cgroupParentSubsystem := flag.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container")
- caCertsPath := flag.String("ca-certs", "", "Path to TLS root certificates")
- detach := flag.Bool("detach", false, "Detach from parent process and run in the background")
- stdinEnv := flag.Bool("stdin-env", false, "Load environment variables from JSON message on stdin")
- sleep := flag.Duration("sleep", 0, "Delay before starting (testing use only)")
- kill := flag.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID")
- list := flag.Bool("list", false, "List UUIDs of existing crunch-run processes")
- enableNetwork := flag.String("container-enable-networking", "default",
+func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
+ flags := flag.NewFlagSet(prog, flag.ContinueOnError)
+ statInterval := flags.Duration("crunchstat-interval", 10*time.Second, "sampling period for periodic resource usage reporting")
+ cgroupRoot := flags.String("cgroup-root", "/sys/fs/cgroup", "path to sysfs cgroup tree")
+ cgroupParent := flags.String("cgroup-parent", "docker", "name of container's parent cgroup (ignored if -cgroup-parent-subsystem is used)")
+ cgroupParentSubsystem := flags.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container")
+ caCertsPath := flags.String("ca-certs", "", "Path to TLS root certificates")
+ detach := flags.Bool("detach", false, "Detach from parent process and run in the background")
+ stdinEnv := flags.Bool("stdin-env", false, "Load environment variables from JSON message on stdin")
+ sleep := flags.Duration("sleep", 0, "Delay before starting (testing use only)")
+ kill := flags.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID")
+ list := flags.Bool("list", false, "List UUIDs of existing crunch-run processes")
+ enableNetwork := flags.String("container-enable-networking", "default",
`Specify if networking should be enabled for container. One of 'default', 'always':
default: only enable networking if container requests it.
always: containers always have networking enabled
`)
- networkMode := flag.String("container-network-mode", "default",
+ networkMode := flags.String("container-network-mode", "default",
`Set networking mode for container. Corresponds to Docker network mode (--net).
`)
- memprofile := flag.String("memprofile", "", "write memory profile to `file` after running container")
- getVersion := flag.Bool("version", false, "Print version information and exit.")
- flag.Duration("check-containerd", 0, "Ignored. Exists for compatibility with older versions.")
+ memprofile := flags.String("memprofile", "", "write memory profile to `file` after running container")
+ flags.Duration("check-containerd", 0, "Ignored. Exists for compatibility with older versions.")
ignoreDetachFlag := false
- if len(os.Args) > 1 && os.Args[1] == "-no-detach" {
+ if len(args) > 0 && args[0] == "-no-detach" {
// This process was invoked by a parent process, which
// has passed along its own arguments, including
// -detach, after the leading -no-detach flag. Strip
// the leading -no-detach flag (it's not recognized by
- // flag.Parse()) and ignore the -detach flag that
+ // flags.Parse()) and ignore the -detach flag that
// comes later.
- os.Args = append([]string{os.Args[0]}, os.Args[2:]...)
+ args = args[1:]
ignoreDetachFlag = true
}
- flag.Parse()
+ if err := flags.Parse(args); err == flag.ErrHelp {
+ return 0
+ } else if err != nil {
+ log.Print(err)
+ return 1
+ }
if *stdinEnv && !ignoreDetachFlag {
// Load env vars on stdin if asked (but not in a
// detached child process, in which case stdin is
// /dev/null).
- loadEnv(os.Stdin)
+ err := loadEnv(os.Stdin)
+ if err != nil {
+ log.Print(err)
+ return 1
+ }
}
+ containerId := flags.Arg(0)
+
switch {
case *detach && !ignoreDetachFlag:
- os.Exit(Detach(flag.Arg(0), os.Args, os.Stdout, os.Stderr))
+ return Detach(containerId, prog, args, os.Stdout, os.Stderr)
case *kill >= 0:
- os.Exit(KillProcess(flag.Arg(0), syscall.Signal(*kill), os.Stdout, os.Stderr))
+ return KillProcess(containerId, syscall.Signal(*kill), os.Stdout, os.Stderr)
case *list:
- os.Exit(ListProcesses(os.Stdout, os.Stderr))
+ return ListProcesses(os.Stdout, os.Stderr)
}
- // Print version information if requested
- if *getVersion {
- fmt.Printf("crunch-run %s\n", version)
- return
+ if containerId == "" {
+ log.Printf("usage: %s [options] UUID", prog)
+ return 1
}
- log.Printf("crunch-run %s started", version)
+ log.Printf("crunch-run %s started", cmd.Version.String())
time.Sleep(*sleep)
- containerId := flag.Arg(0)
-
if *caCertsPath != "" {
arvadosclient.CertFiles = []string{*caCertsPath}
}
api, err := arvadosclient.MakeArvadosClient()
if err != nil {
- log.Fatalf("%s: %v", containerId, err)
+ log.Printf("%s: %v", containerId, err)
+ return 1
}
api.Retries = 8
kc, kcerr := keepclient.MakeKeepClient(api)
if kcerr != nil {
- log.Fatalf("%s: %v", containerId, kcerr)
+ log.Printf("%s: %v", containerId, kcerr)
+ return 1
}
kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2}
kc.Retries = 4
@@ -1845,18 +1858,20 @@ func main() {
cr, err := NewContainerRunner(arvados.NewClientFromEnv(), api, kc, docker, containerId)
if err != nil {
- log.Fatal(err)
+ log.Print(err)
+ return 1
}
if dockererr != nil {
cr.CrunchLog.Printf("%s: %v", containerId, dockererr)
cr.checkBrokenNode(dockererr)
cr.CrunchLog.Close()
- os.Exit(1)
+ return 1
}
parentTemp, tmperr := cr.MkTempDir("", "crunch-run."+containerId+".")
if tmperr != nil {
- log.Fatalf("%s: %v", containerId, tmperr)
+ log.Printf("%s: %v", containerId, tmperr)
+ return 1
}
cr.parentTemp = parentTemp
@@ -1889,24 +1904,27 @@ func main() {
}
if runerr != nil {
- log.Fatalf("%s: %v", containerId, runerr)
+ log.Printf("%s: %v", containerId, runerr)
+ return 1
}
+ return 0
}
-func loadEnv(rdr io.Reader) {
+func loadEnv(rdr io.Reader) error {
buf, err := ioutil.ReadAll(rdr)
if err != nil {
- log.Fatalf("read stdin: %s", err)
+ return fmt.Errorf("read stdin: %s", err)
}
var env map[string]string
err = json.Unmarshal(buf, &env)
if err != nil {
- log.Fatalf("decode stdin: %s", err)
+ return fmt.Errorf("decode stdin: %s", err)
}
for k, v := range env {
err = os.Setenv(k, v)
if err != nil {
- log.Fatalf("setenv(%q): %s", k, err)
+ return fmt.Errorf("setenv(%q): %s", k, err)
}
}
+ return nil
}
diff --git a/services/crunch-run/crunchrun_test.go b/lib/crunchrun/crunchrun_test.go
similarity index 99%
rename from services/crunch-run/crunchrun_test.go
rename to lib/crunchrun/crunchrun_test.go
index 636926a8d..e8c7660d1 100644
--- a/services/crunch-run/crunchrun_test.go
+++ b/lib/crunchrun/crunchrun_test.go
@@ -2,7 +2,7 @@
//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package crunchrun
import (
"bufio"
diff --git a/services/crunch-run/git_mount.go b/lib/crunchrun/git_mount.go
similarity index 99%
rename from services/crunch-run/git_mount.go
rename to lib/crunchrun/git_mount.go
index 4fcba1379..92bb6d11d 100644
--- a/services/crunch-run/git_mount.go
+++ b/lib/crunchrun/git_mount.go
@@ -2,7 +2,7 @@
//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package crunchrun
import (
"fmt"
diff --git a/services/crunch-run/git_mount_test.go b/lib/crunchrun/git_mount_test.go
similarity index 99%
rename from services/crunch-run/git_mount_test.go
rename to lib/crunchrun/git_mount_test.go
index 608917808..e39beaa94 100644
--- a/services/crunch-run/git_mount_test.go
+++ b/lib/crunchrun/git_mount_test.go
@@ -2,7 +2,7 @@
//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package crunchrun
import (
"io/ioutil"
diff --git a/services/crunch-run/logging.go b/lib/crunchrun/logging.go
similarity index 99%
rename from services/crunch-run/logging.go
rename to lib/crunchrun/logging.go
index 01cab69cd..d5de184e5 100644
--- a/services/crunch-run/logging.go
+++ b/lib/crunchrun/logging.go
@@ -2,7 +2,7 @@
//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package crunchrun
import (
"bufio"
diff --git a/services/crunch-run/logging_test.go b/lib/crunchrun/logging_test.go
similarity index 99%
rename from services/crunch-run/logging_test.go
rename to lib/crunchrun/logging_test.go
index 4a2944f64..fab333b43 100644
--- a/services/crunch-run/logging_test.go
+++ b/lib/crunchrun/logging_test.go
@@ -2,7 +2,7 @@
//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package crunchrun
import (
"fmt"
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list