[ARVADOS] created: 1.3.0-2009-gdd9367afe

Git user git at public.arvados.org
Mon Dec 30 17:03:29 UTC 2019


        at  dd9367afefff5d0cd38d1549e32e2794e4614fb4 (commit)


commit dd9367afefff5d0cd38d1549e32e2794e4614fb4
Author: Tom Clegg <tom at tomclegg.ca>
Date:   Mon Dec 30 11:25:23 2019 -0500

    15759: Improve logs.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at tomclegg.ca>

diff --git a/lib/dispatchcloud/worker/worker.go b/lib/dispatchcloud/worker/worker.go
index ed8f06394..357ac20a0 100644
--- a/lib/dispatchcloud/worker/worker.go
+++ b/lib/dispatchcloud/worker/worker.go
@@ -369,7 +369,6 @@ func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
 		wkr.logger.WithError(err).WithField("stderr", string(stderr2)).Warn("error copying runner binary")
 		return false, stderr2
 	} else {
-		wkr.logger.Info("runner binary OK")
 		stderr = append(stderr, stderr2...)
 	}
 	return true, stderr
@@ -378,9 +377,14 @@ func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
 func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) {
 	hash := fmt.Sprintf("%x", wkr.wp.runnerMD5)
 	dstdir, _ := filepath.Split(wkr.wp.runnerCmd)
+	logger := wkr.logger.WithFields(logrus.Fields{
+		"hash": hash,
+		"path": wkr.wp.runnerCmd,
+	})
 
 	stdout, stderr, err = wkr.executor.Execute(nil, `md5sum `+wkr.wp.runnerCmd, nil)
 	if err == nil && len(stderr) == 0 && bytes.Equal(stdout, []byte(hash+"  "+wkr.wp.runnerCmd+"\n")) {
+		logger.Info("runner binary already exists on worker, with correct hash")
 		return
 	}
 
@@ -391,6 +395,7 @@ func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) {
 	if wkr.instance.RemoteUser() != "root" {
 		cmd = `sudo sh -c '` + strings.Replace(cmd, "'", "'\\''", -1) + `'`
 	}
+	logger.WithField("cmd", cmd).Info("installing runner binary on worker")
 	stdout, stderr, err = wkr.executor.Execute(nil, cmd, bytes.NewReader(wkr.wp.runnerData))
 	return
 }

commit 08ee2ab7699e7fee77f6c428977762cb12287166
Author: Tom Clegg <tom at tomclegg.ca>
Date:   Mon Dec 30 11:09:13 2019 -0500

    15759: Update file mode; avoid /var/run, typically mounted noexec.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at tomclegg.ca>

diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go
index 34bc26dea..c52422c41 100644
--- a/lib/dispatchcloud/worker/pool.go
+++ b/lib/dispatchcloud/worker/pool.go
@@ -790,7 +790,7 @@ func (wp *Pool) loadRunnerData() error {
 	}
 	wp.runnerData = buf
 	wp.runnerMD5 = md5.Sum(buf)
-	wp.runnerCmd = fmt.Sprintf("/var/run/arvados/crunch-run~%x", wp.runnerMD5)
+	wp.runnerCmd = fmt.Sprintf("/var/lib/arvados/crunch-run~%x", wp.runnerMD5)
 	return nil
 }
 
diff --git a/lib/dispatchcloud/worker/worker.go b/lib/dispatchcloud/worker/worker.go
index a455a7d06..ed8f06394 100644
--- a/lib/dispatchcloud/worker/worker.go
+++ b/lib/dispatchcloud/worker/worker.go
@@ -387,7 +387,7 @@ func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) {
 	// Note touch+chmod come before writing data, to avoid the
 	// possibility of md5 being correct while file mode is
 	// incorrect.
-	cmd := `set -e; dstdir="` + dstdir + `"; dstfile="` + wkr.wp.runnerCmd + `"; mkdir -p "$dstdir"; touch "$dstfile"; chmod 0700 "$dstfile"; cat >"$dstfile"`
+	cmd := `set -e; dstdir="` + dstdir + `"; dstfile="` + wkr.wp.runnerCmd + `"; mkdir -p "$dstdir"; touch "$dstfile"; chmod 0755 "$dstdir" "$dstfile"; cat >"$dstfile"`
 	if wkr.instance.RemoteUser() != "root" {
 		cmd = `sudo sh -c '` + strings.Replace(cmd, "'", "'\\''", -1) + `'`
 	}

commit c347c99a6ffe9d72f6decf29d2553bc6caf126c2
Author: Tom Clegg <tom at tomclegg.ca>
Date:   Mon Dec 30 10:55:38 2019 -0500

    15759: Add DeployRunnerBinary to config.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at tomclegg.ca>

diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go
index 6bc269c7e..34bc26dea 100644
--- a/lib/dispatchcloud/worker/pool.go
+++ b/lib/dispatchcloud/worker/pool.go
@@ -102,6 +102,7 @@ func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *promethe
 		instanceSet:        &throttledInstanceSet{InstanceSet: instanceSet},
 		newExecutor:        newExecutor,
 		bootProbeCommand:   cluster.Containers.CloudVMs.BootProbeCommand,
+		runnerSource:       cluster.Containers.CloudVMs.DeployRunnerBinary,
 		imageID:            cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
 		instanceTypes:      cluster.InstanceTypes,
 		maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond,
diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go
index 433e361e1..ca39d6b26 100644
--- a/sdk/go/arvados/config.go
+++ b/sdk/go/arvados/config.go
@@ -390,6 +390,7 @@ type CloudVMsConfig struct {
 	Enable bool
 
 	BootProbeCommand     string
+	DeployRunnerBinary   string
 	ImageID              string
 	MaxCloudOpsPerSecond int
 	MaxProbesPerSecond   int

commit a3c7d9e03062e3246b0857fbae05f45d22e39169
Merge: da3cd46f9 949abe8b7
Author: Tom Clegg <tom at tomclegg.ca>
Date:   Mon Dec 30 10:18:15 2019 -0500

    15759: Merge branch 'master' into 15759-deploy-crunch-run
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at tomclegg.ca>


commit da3cd46f9204f522b73023b42e8a4d04f697e9c7
Author: Tom Clegg <tom at tomclegg.ca>
Date:   Mon Dec 30 10:10:30 2019 -0500

    15759: Deploy supervisor binary from dispatcher to worker VMs.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at tomclegg.ca>

diff --git a/lib/cmd/cmd.go b/lib/cmd/cmd.go
index 51bcf55c7..611c95d23 100644
--- a/lib/cmd/cmd.go
+++ b/lib/cmd/cmd.go
@@ -65,6 +65,11 @@ type Multi map[string]Handler
 
 func (m Multi) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
 	_, basename := filepath.Split(prog)
+	if i := strings.Index(basename, "~"); i >= 0 {
+		// drop "~anything" suffix (arvados-dispatch-cloud's
+		// DeployRunnerBinary feature relies on this)
+		basename = basename[:i]
+	}
 	cmd, ok := m[basename]
 	if !ok {
 		// "controller" command exists, and binary is named "arvados-controller"
diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml
index 7779d7ebf..ccc6343e6 100644
--- a/lib/config/config.default.yml
+++ b/lib/config/config.default.yml
@@ -788,6 +788,16 @@ Clusters:
         # Worker VM image ID.
         ImageID: ""
 
+        # An executable file (located on the dispatcher host) to be
+        # copied to cloud instances at runtime and used as the
+        # container runner/supervisor. The default value is the
+        # dispatcher program itself.
+        #
+        # Use the empty string to disable this step: nothing will be
+        # copied, and cloud instances are assumed to have a suitable
+        # version of crunch-run installed.
+        DeployRunnerBinary: "/proc/self/exe"
+
         # Tags to add on all resources (VMs, NICs, disks) created by
         # the container dispatcher. (Arvados's own tags --
         # InstanceType, IdleBehavior, and InstanceSecret -- will also
diff --git a/lib/config/generated_config.go b/lib/config/generated_config.go
index 3e58d249b..3d4c18e28 100644
--- a/lib/config/generated_config.go
+++ b/lib/config/generated_config.go
@@ -794,6 +794,16 @@ Clusters:
         # Worker VM image ID.
         ImageID: ""
 
+        # An executable file (located on the dispatcher host) to be
+        # copied to cloud instances at runtime and used as the
+        # container runner/supervisor. The default value is the
+        # dispatcher program itself.
+        #
+        # Use the empty string to disable this step: nothing will be
+        # copied, and cloud instances are assumed to have a suitable
+        # version of crunch-run installed.
+        DeployRunnerBinary: "/proc/self/exe"
+
         # Tags to add on all resources (VMs, NICs, disks) created by
         # the container dispatcher. (Arvados's own tags --
         # InstanceType, IdleBehavior, and InstanceSecret -- will also
diff --git a/lib/dispatchcloud/dispatcher.go b/lib/dispatchcloud/dispatcher.go
index ce7dc0730..19c8f6e0b 100644
--- a/lib/dispatchcloud/dispatcher.go
+++ b/lib/dispatchcloud/dispatcher.go
@@ -37,6 +37,7 @@ const (
 
 type pool interface {
 	scheduler.WorkerPool
+	CheckHealth() error
 	Instances() []worker.InstanceView
 	SetIdleBehavior(cloud.InstanceID, worker.IdleBehavior) error
 	KillInstance(id cloud.InstanceID, reason string) error
@@ -78,7 +79,7 @@ func (disp *dispatcher) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 // CheckHealth implements service.Handler.
 func (disp *dispatcher) CheckHealth() error {
 	disp.Start()
-	return nil
+	return disp.pool.CheckHealth()
 }
 
 // Stop dispatching containers and release resources. Typically used
diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go
index 649910f63..6bc269c7e 100644
--- a/lib/dispatchcloud/worker/pool.go
+++ b/lib/dispatchcloud/worker/pool.go
@@ -5,10 +5,12 @@
 package worker
 
 import (
+	"crypto/md5"
 	"crypto/rand"
 	"errors"
 	"fmt"
 	"io"
+	"io/ioutil"
 	"sort"
 	"strings"
 	"sync"
@@ -135,6 +137,7 @@ type Pool struct {
 	instanceSet        *throttledInstanceSet
 	newExecutor        func(cloud.Instance) Executor
 	bootProbeCommand   string
+	runnerSource       string
 	imageID            cloud.ImageID
 	instanceTypes      map[string]arvados.InstanceType
 	syncInterval       time.Duration
@@ -160,6 +163,9 @@ type Pool struct {
 	stop         chan bool
 	mtx          sync.RWMutex
 	setupOnce    sync.Once
+	runnerData   []byte
+	runnerMD5    [md5.Size]byte
+	runnerCmd    string
 
 	throttleCreate    throttle
 	throttleInstances throttle
@@ -177,6 +183,14 @@ type createCall struct {
 	instanceType arvados.InstanceType
 }
 
+func (wp *Pool) CheckHealth() error {
+	wp.setupOnce.Do(wp.setup)
+	if err := wp.loadRunnerData(); err != nil {
+		return fmt.Errorf("error loading runner binary: %s", err)
+	}
+	return nil
+}
+
 // Subscribe returns a buffered channel that becomes ready after any
 // change to the pool's state that could have scheduling implications:
 // a worker's state changes, a new worker appears, the cloud
@@ -276,6 +290,10 @@ func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
 func (wp *Pool) Create(it arvados.InstanceType) bool {
 	logger := wp.logger.WithField("InstanceType", it.Name)
 	wp.setupOnce.Do(wp.setup)
+	if wp.loadRunnerData() != nil {
+		// Boot probe is certain to fail.
+		return false
+	}
 	wp.mtx.Lock()
 	defer wp.mtx.Unlock()
 	if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
@@ -743,6 +761,36 @@ func (wp *Pool) setup() {
 	wp.exited = map[string]time.Time{}
 	wp.workers = map[cloud.InstanceID]*worker{}
 	wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
+	wp.loadRunnerData()
+}
+
+// Load the runner program to be deployed on worker nodes into
+// wp.runnerData, if necessary. Errors are logged.
+//
+// If auto-deploy is disabled, len(wp.runnerData) will be 0.
+//
+// Caller must not have lock.
+func (wp *Pool) loadRunnerData() error {
+	wp.mtx.Lock()
+	defer wp.mtx.Unlock()
+	if wp.runnerData != nil {
+		return nil
+	} else if wp.runnerSource == "" {
+		wp.runnerCmd = "crunch-run"
+		wp.runnerData = []byte{}
+		return nil
+	}
+	logger := wp.logger.WithField("source", wp.runnerSource)
+	logger.Debug("loading runner")
+	buf, err := ioutil.ReadFile(wp.runnerSource)
+	if err != nil {
+		logger.WithError(err).Error("failed to load runner program")
+		return err
+	}
+	wp.runnerData = buf
+	wp.runnerMD5 = md5.Sum(buf)
+	wp.runnerCmd = fmt.Sprintf("/var/run/arvados/crunch-run~%x", wp.runnerMD5)
+	return nil
 }
 
 func (wp *Pool) notify() {
diff --git a/lib/dispatchcloud/worker/pool_test.go b/lib/dispatchcloud/worker/pool_test.go
index 8b2415b40..1948c1e87 100644
--- a/lib/dispatchcloud/worker/pool_test.go
+++ b/lib/dispatchcloud/worker/pool_test.go
@@ -70,9 +70,11 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
 	c.Assert(err, check.IsNil)
 
 	newExecutor := func(cloud.Instance) Executor {
-		return stubExecutor{
-			"crunch-run --list": stubResp{},
-			"true":              stubResp{},
+		return &stubExecutor{
+			response: map[string]stubResp{
+				"crunch-run --list": stubResp{},
+				"true":              stubResp{},
+			},
 		}
 	}
 
@@ -155,7 +157,7 @@ func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
 	type3 := arvados.InstanceType{Name: "a2l", ProviderType: "a2.large", VCPUs: 4, RAM: 4 * GiB, Price: .04}
 	pool := &Pool{
 		logger:      logger,
-		newExecutor: func(cloud.Instance) Executor { return stubExecutor{} },
+		newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
 		instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
 		instanceTypes: arvados.InstanceTypeMap{
 			type1.Name: type1,
diff --git a/lib/dispatchcloud/worker/runner.go b/lib/dispatchcloud/worker/runner.go
index e819a6036..475212134 100644
--- a/lib/dispatchcloud/worker/runner.go
+++ b/lib/dispatchcloud/worker/runner.go
@@ -20,6 +20,7 @@ type remoteRunner struct {
 	uuid          string
 	executor      Executor
 	envJSON       json.RawMessage
+	runnerCmd     string
 	remoteUser    string
 	timeoutTERM   time.Duration
 	timeoutSignal time.Duration
@@ -59,6 +60,7 @@ func newRemoteRunner(uuid string, wkr *worker) *remoteRunner {
 		uuid:          uuid,
 		executor:      wkr.executor,
 		envJSON:       envJSON,
+		runnerCmd:     wkr.wp.runnerCmd,
 		remoteUser:    wkr.instance.RemoteUser(),
 		timeoutTERM:   wkr.wp.timeoutTERM,
 		timeoutSignal: wkr.wp.timeoutSignal,
@@ -76,7 +78,7 @@ func newRemoteRunner(uuid string, wkr *worker) *remoteRunner {
 // assume the remote process _might_ have started, at least until it
 // probes the worker and finds otherwise.
 func (rr *remoteRunner) Start() {
-	cmd := "crunch-run --detach --stdin-env '" + rr.uuid + "'"
+	cmd := rr.runnerCmd + " --detach --stdin-env '" + rr.uuid + "'"
 	if rr.remoteUser != "root" {
 		cmd = "sudo " + cmd
 	}
@@ -136,7 +138,7 @@ func (rr *remoteRunner) Kill(reason string) {
 func (rr *remoteRunner) kill(sig syscall.Signal) {
 	logger := rr.logger.WithField("Signal", int(sig))
 	logger.Info("sending signal")
-	cmd := fmt.Sprintf("crunch-run --kill %d %s", sig, rr.uuid)
+	cmd := fmt.Sprintf(rr.runnerCmd+" --kill %d %s", sig, rr.uuid)
 	if rr.remoteUser != "root" {
 		cmd = "sudo " + cmd
 	}
diff --git a/lib/dispatchcloud/worker/worker.go b/lib/dispatchcloud/worker/worker.go
index cda5a14b1..a455a7d06 100644
--- a/lib/dispatchcloud/worker/worker.go
+++ b/lib/dispatchcloud/worker/worker.go
@@ -5,7 +5,9 @@
 package worker
 
 import (
+	"bytes"
 	"fmt"
+	"path/filepath"
 	"strings"
 	"sync"
 	"time"
@@ -318,7 +320,7 @@ func (wkr *worker) probeAndUpdate() {
 }
 
 func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
-	cmd := "crunch-run --list"
+	cmd := wkr.wp.runnerCmd + " --list"
 	if u := wkr.instance.RemoteUser(); u != "root" {
 		cmd = "sudo " + cmd
 	}
@@ -358,9 +360,41 @@ func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
 		return false, stderr
 	}
 	logger.Info("boot probe succeeded")
+	if err = wkr.wp.loadRunnerData(); err != nil {
+		wkr.logger.WithError(err).Warn("cannot boot worker: error loading runner binary")
+		return false, stderr
+	} else if len(wkr.wp.runnerData) == 0 {
+		// Assume crunch-run is already installed
+	} else if _, stderr2, err := wkr.copyRunnerData(); err != nil {
+		wkr.logger.WithError(err).WithField("stderr", string(stderr2)).Warn("error copying runner binary")
+		return false, stderr2
+	} else {
+		wkr.logger.Info("runner binary OK")
+		stderr = append(stderr, stderr2...)
+	}
 	return true, stderr
 }
 
+func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) {
+	hash := fmt.Sprintf("%x", wkr.wp.runnerMD5)
+	dstdir, _ := filepath.Split(wkr.wp.runnerCmd)
+
+	stdout, stderr, err = wkr.executor.Execute(nil, `md5sum `+wkr.wp.runnerCmd, nil)
+	if err == nil && len(stderr) == 0 && bytes.Equal(stdout, []byte(hash+"  "+wkr.wp.runnerCmd+"\n")) {
+		return
+	}
+
+	// Note touch+chmod come before writing data, to avoid the
+	// possibility of md5 being correct while file mode is
+	// incorrect.
+	cmd := `set -e; dstdir="` + dstdir + `"; dstfile="` + wkr.wp.runnerCmd + `"; mkdir -p "$dstdir"; touch "$dstfile"; chmod 0700 "$dstfile"; cat >"$dstfile"`
+	if wkr.instance.RemoteUser() != "root" {
+		cmd = `sudo sh -c '` + strings.Replace(cmd, "'", "'\\''", -1) + `'`
+	}
+	stdout, stderr, err = wkr.executor.Execute(nil, cmd, bytes.NewReader(wkr.wp.runnerData))
+	return
+}
+
 // caller must have lock.
 func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
 	if wkr.idleBehavior == IdleBehaviorHold {
diff --git a/lib/dispatchcloud/worker/worker_test.go b/lib/dispatchcloud/worker/worker_test.go
index 4f8f77bff..a4c2a6370 100644
--- a/lib/dispatchcloud/worker/worker_test.go
+++ b/lib/dispatchcloud/worker/worker_test.go
@@ -5,8 +5,12 @@
 package worker
 
 import (
+	"bytes"
+	"crypto/md5"
 	"errors"
+	"fmt"
 	"io"
+	"strings"
 	"time"
 
 	"git.arvados.org/arvados.git/lib/cloud"
@@ -38,7 +42,11 @@ func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) {
 		running         int
 		starting        int
 		respBoot        stubResp // zero value is success
+		respDeploy      stubResp // zero value is success
 		respRun         stubResp // zero value is success + nothing running
+		respRunDeployed stubResp
+		deployRunner    []byte
+		expectStdin     []byte
 		expectState     State
 		expectRunning   int
 	}
@@ -46,7 +54,7 @@ func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) {
 	errFail := errors.New("failed")
 	respFail := stubResp{"", "command failed\n", errFail}
 	respContainerRunning := stubResp{"zzzzz-dz642-abcdefghijklmno\n", "", nil}
-	for _, trial := range []trialT{
+	for idx, trial := range []trialT{
 		{
 			testCaseComment: "Unknown, probes fail",
 			state:           StateUnknown,
@@ -185,12 +193,40 @@ func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) {
 			starting:        1,
 			expectState:     StateRunning,
 		},
+		{
+			testCaseComment: "Booting, boot probe succeeds, deployRunner succeeds, run probe succeeds",
+			state:           StateBooting,
+			deployRunner:    []byte("ELF"),
+			expectStdin:     []byte("ELF"),
+			respRun:         respFail,
+			respRunDeployed: respContainerRunning,
+			expectRunning:   1,
+			expectState:     StateRunning,
+		},
+		{
+			testCaseComment: "Booting, boot probe succeeds, deployRunner fails",
+			state:           StateBooting,
+			deployRunner:    []byte("ELF"),
+			respDeploy:      respFail,
+			expectStdin:     []byte("ELF"),
+			expectState:     StateBooting,
+		},
+		{
+			testCaseComment: "Booting, boot probe succeeds, deployRunner skipped, run probe succeeds",
+			state:           StateBooting,
+			deployRunner:    nil,
+			respDeploy:      respFail,
+			expectState:     StateIdle,
+		},
 	} {
-		c.Logf("------- %#v", trial)
+		c.Logf("------- trial %d: %#v", idx, trial)
 		ctime := time.Now().Add(-trial.age)
-		exr := stubExecutor{
-			"bootprobe":         trial.respBoot,
-			"crunch-run --list": trial.respRun,
+		exr := &stubExecutor{
+			response: map[string]stubResp{
+				"bootprobe":         trial.respBoot,
+				"crunch-run --list": trial.respRun,
+				"{deploy}":          trial.respDeploy,
+			},
 		}
 		wp := &Pool{
 			arvClient:        ac,
@@ -199,6 +235,14 @@ func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) {
 			timeoutBooting:   bootTimeout,
 			timeoutProbe:     probeTimeout,
 			exited:           map[string]time.Time{},
+			runnerCmd:        "crunch-run",
+			runnerData:       trial.deployRunner,
+			runnerMD5:        md5.Sum(trial.deployRunner),
+		}
+		if trial.deployRunner != nil {
+			svHash := md5.Sum(trial.deployRunner)
+			wp.runnerCmd = fmt.Sprintf("/var/run/arvados/crunch-run~%x", svHash)
+			exr.response[wp.runnerCmd+" --list"] = trial.respRunDeployed
 		}
 		wkr := &worker{
 			logger:   logger,
@@ -226,6 +270,7 @@ func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) {
 		wkr.probeAndUpdate()
 		c.Check(wkr.state, check.Equals, trial.expectState)
 		c.Check(len(wkr.running), check.Equals, trial.expectRunning)
+		c.Check(exr.stdin.String(), check.Equals, string(trial.expectStdin))
 	}
 }
 
@@ -234,14 +279,27 @@ type stubResp struct {
 	stderr string
 	err    error
 }
-type stubExecutor map[string]stubResp
 
-func (se stubExecutor) SetTarget(cloud.ExecutorTarget) {}
-func (se stubExecutor) Close()                         {}
-func (se stubExecutor) Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error) {
-	resp, ok := se[cmd]
+type stubExecutor struct {
+	response map[string]stubResp
+	stdin    bytes.Buffer
+}
+
+func (se *stubExecutor) SetTarget(cloud.ExecutorTarget) {}
+func (se *stubExecutor) Close()                         {}
+func (se *stubExecutor) Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error) {
+	if stdin != nil {
+		_, err = io.Copy(&se.stdin, stdin)
+		if err != nil {
+			return nil, []byte(err.Error()), err
+		}
+	}
+	resp, ok := se.response[cmd]
+	if !ok && strings.Contains(cmd, `; cat >"$dstfile"`) {
+		resp, ok = se.response["{deploy}"]
+	}
 	if !ok {
-		return nil, []byte("command not found\n"), errors.New("command not found")
+		return nil, []byte(fmt.Sprintf("%s: command not found\n", cmd)), errors.New("command not found")
 	}
 	return []byte(resp.stdout), []byte(resp.stderr), resp.err
 }

commit 0c1c09cfd0a28bcc591f4cd5e2175c63091e4e83
Author: Tom Clegg <tom at tomclegg.ca>
Date:   Wed Dec 18 11:55:32 2019 -0500

    15759: Move crunch-run into arvados-server cmd.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at tomclegg.ca>

diff --git a/build/run-build-packages.sh b/build/run-build-packages.sh
index d6c8f5ac6..3173aa705 100755
--- a/build/run-build-packages.sh
+++ b/build/run-build-packages.sh
@@ -294,7 +294,7 @@ package_go_binary services/crunch-dispatch-local crunch-dispatch-local \
     "Dispatch Crunch containers on the local system"
 package_go_binary services/crunch-dispatch-slurm crunch-dispatch-slurm \
     "Dispatch Crunch containers to a SLURM cluster"
-package_go_binary services/crunch-run crunch-run \
+package_go_binary cmd/arvados-server crunch-run \
     "Supervise a single Crunch container"
 package_go_binary services/crunchstat crunchstat \
     "Gather cpu/memory/network statistics of running Crunch jobs"
diff --git a/build/run-tests.sh b/build/run-tests.sh
index 02a6a4044..f21861762 100755
--- a/build/run-tests.sh
+++ b/build/run-tests.sh
@@ -81,6 +81,7 @@ lib/controller/railsproxy
 lib/controller/router
 lib/controller/rpc
 lib/crunchstat
+lib/crunch-run
 lib/cloud
 lib/cloud/azure
 lib/cloud/cloudtest
@@ -104,7 +105,6 @@ services/keep-balance
 services/login-sync
 services/nodemanager
 services/nodemanager_integration
-services/crunch-run
 services/crunch-dispatch-local
 services/crunch-dispatch-slurm
 services/ws
diff --git a/cmd/arvados-server/cmd.go b/cmd/arvados-server/cmd.go
index fe6689862..d93a8e78f 100644
--- a/cmd/arvados-server/cmd.go
+++ b/cmd/arvados-server/cmd.go
@@ -11,6 +11,7 @@ import (
 	"git.arvados.org/arvados.git/lib/cmd"
 	"git.arvados.org/arvados.git/lib/config"
 	"git.arvados.org/arvados.git/lib/controller"
+	"git.arvados.org/arvados.git/lib/crunchrun"
 	"git.arvados.org/arvados.git/lib/dispatchcloud"
 )
 
@@ -25,6 +26,7 @@ var (
 		"config-dump":     config.DumpCommand,
 		"config-defaults": config.DumpDefaultsCommand,
 		"controller":      controller.Command,
+		"crunch-run":      crunchrun.Command,
 		"dispatch-cloud":  dispatchcloud.Command,
 	})
 )
diff --git a/lib/cmd/cmd.go b/lib/cmd/cmd.go
index 24b69f0cc..51bcf55c7 100644
--- a/lib/cmd/cmd.go
+++ b/lib/cmd/cmd.go
@@ -37,6 +37,10 @@ var version = "dev"
 
 type versionCommand struct{}
 
+func (versionCommand) String() string {
+	return fmt.Sprintf("%s (%s)", version, runtime.Version())
+}
+
 func (versionCommand) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
 	prog = regexp.MustCompile(` -*version$`).ReplaceAllLiteralString(prog, "")
 	fmt.Fprintf(stdout, "%s %s (%s)\n", prog, version, runtime.Version())
@@ -61,9 +65,16 @@ type Multi map[string]Handler
 
 func (m Multi) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
 	_, basename := filepath.Split(prog)
-	basename = strings.TrimPrefix(basename, "arvados-")
-	basename = strings.TrimPrefix(basename, "crunch-")
-	if cmd, ok := m[basename]; ok {
+	cmd, ok := m[basename]
+	if !ok {
+		// "controller" command exists, and binary is named "arvados-controller"
+		cmd, ok = m[strings.TrimPrefix(basename, "arvados-")]
+	}
+	if !ok {
+		// "dispatch-slurm" command exists, and binary is named "crunch-dispatch-slurm"
+		cmd, ok = m[strings.TrimPrefix(basename, "crunch-")]
+	}
+	if ok {
 		return cmd.RunCommand(prog, args, stdin, stdout, stderr)
 	} else if len(args) < 1 {
 		fmt.Fprintf(stderr, "usage: %s command [args]\n", prog)
diff --git a/services/crunch-run/background.go b/lib/crunchrun/background.go
similarity index 82%
rename from services/crunch-run/background.go
rename to lib/crunchrun/background.go
index 852ccb6ec..bf039afa0 100644
--- a/services/crunch-run/background.go
+++ b/lib/crunchrun/background.go
@@ -2,7 +2,7 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package crunchrun
 
 import (
 	"encoding/json"
@@ -36,10 +36,10 @@ type procinfo struct {
 //
 // Stdout and stderr in the child process are sent to the systemd
 // journal using the systemd-cat program.
-func Detach(uuid string, args []string, stdout, stderr io.Writer) int {
-	return exitcode(stderr, detach(uuid, args, stdout, stderr))
+func Detach(uuid string, prog string, args []string, stdout, stderr io.Writer) int {
+	return exitcode(stderr, detach(uuid, prog, args, stdout, stderr))
 }
-func detach(uuid string, args []string, stdout, stderr io.Writer) error {
+func detach(uuid string, prog string, args []string, stdout, stderr io.Writer) error {
 	lockfile, err := func() (*os.File, error) {
 		// We must hold the dir-level lock between
 		// opening/creating the lockfile and acquiring LOCK_EX
@@ -68,7 +68,31 @@ func detach(uuid string, args []string, stdout, stderr io.Writer) error {
 	defer lockfile.Close()
 	lockfile.Truncate(0)
 
-	cmd := exec.Command("systemd-cat", append([]string{"--identifier=crunch-run", args[0], "-no-detach"}, args[1:]...)...)
+	execargs := append([]string{"-no-detach"}, args...)
+	if strings.HasSuffix(prog, " crunch-run") {
+		// invoked as "/path/to/arvados-server crunch-run"
+		// (see arvados/lib/cmd.Multi)
+		execargs = append([]string{strings.TrimSuffix(prog, " crunch-run"), "crunch-run"}, execargs...)
+	} else {
+		// invoked as "/path/to/crunch-run"
+		execargs = append([]string{prog}, execargs...)
+	}
+	execargs = append([]string{
+		// Here, if the inner systemd-cat can't exec
+		// crunch-run, it writes an error message to stderr,
+		// and the outer systemd-cat writes it to the journal
+		// where the operator has a chance to discover it. (If
+		// we only used one systemd-cat command, it would be
+		// up to us to report the error -- but we are going to
+		// detach and exit, not wait for something to appear
+		// on stderr.)  Note these systemd-cat calls don't
+		// result in additional processes -- they just connect
+		// stderr/stdout to sockets and call exec().
+		"systemd-cat", "--identifier=crunch-run",
+		"systemd-cat", "--identifier=crunch-run",
+	}, execargs...)
+
+	cmd := exec.Command(execargs[0], execargs[1:]...)
 	// Child inherits lockfile.
 	cmd.ExtraFiles = []*os.File{lockfile}
 	// Ensure child isn't interrupted even if we receive signals
diff --git a/services/crunch-run/cgroup.go b/lib/crunchrun/cgroup.go
similarity index 97%
rename from services/crunch-run/cgroup.go
rename to lib/crunchrun/cgroup.go
index 9e52de52a..0b254f5bd 100644
--- a/services/crunch-run/cgroup.go
+++ b/lib/crunchrun/cgroup.go
@@ -2,7 +2,7 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package crunchrun
 
 import (
 	"bytes"
diff --git a/services/crunch-run/cgroup_test.go b/lib/crunchrun/cgroup_test.go
similarity index 95%
rename from services/crunch-run/cgroup_test.go
rename to lib/crunchrun/cgroup_test.go
index 95adf7484..b43479a3b 100644
--- a/services/crunch-run/cgroup_test.go
+++ b/lib/crunchrun/cgroup_test.go
@@ -2,7 +2,7 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package crunchrun
 
 import (
 	. "gopkg.in/check.v1"
diff --git a/services/crunch-run/copier.go b/lib/crunchrun/copier.go
similarity index 99%
rename from services/crunch-run/copier.go
rename to lib/crunchrun/copier.go
index e5711cc73..b1497277f 100644
--- a/services/crunch-run/copier.go
+++ b/lib/crunchrun/copier.go
@@ -2,7 +2,7 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package crunchrun
 
 import (
 	"encoding/json"
diff --git a/services/crunch-run/copier_test.go b/lib/crunchrun/copier_test.go
similarity index 99%
rename from services/crunch-run/copier_test.go
rename to lib/crunchrun/copier_test.go
index d767bc463..777b715d7 100644
--- a/services/crunch-run/copier_test.go
+++ b/lib/crunchrun/copier_test.go
@@ -2,7 +2,7 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package crunchrun
 
 import (
 	"io"
diff --git a/services/crunch-run/crunchrun.go b/lib/crunchrun/crunchrun.go
similarity index 95%
rename from services/crunch-run/crunchrun.go
rename to lib/crunchrun/crunchrun.go
index 0a19d17f4..b0a4007f7 100644
--- a/services/crunch-run/crunchrun.go
+++ b/lib/crunchrun/crunchrun.go
@@ -2,7 +2,7 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package crunchrun
 
 import (
 	"bytes"
@@ -27,6 +27,7 @@ import (
 	"syscall"
 	"time"
 
+	"git.arvados.org/arvados.git/lib/cmd"
 	"git.arvados.org/arvados.git/lib/crunchstat"
 	"git.arvados.org/arvados.git/sdk/go/arvados"
 	"git.arvados.org/arvados.git/sdk/go/arvadosclient"
@@ -40,7 +41,9 @@ import (
 	dockerclient "github.com/docker/docker/client"
 )
 
-var version = "dev"
+type command struct{}
+
+var Command = command{}
 
 // IArvadosClient is the minimal Arvados API methods used by crunch-run.
 type IArvadosClient interface {
@@ -1527,7 +1530,7 @@ func (runner *ContainerRunner) NewArvLogWriter(name string) (io.WriteCloser, err
 
 // Run the full container lifecycle.
 func (runner *ContainerRunner) Run() (err error) {
-	runner.CrunchLog.Printf("crunch-run %s started", version)
+	runner.CrunchLog.Printf("crunch-run %s started", cmd.Version.String())
 	runner.CrunchLog.Printf("Executing container '%s'", runner.Container.UUID)
 
 	hostname, hosterr := os.Hostname()
@@ -1758,83 +1761,93 @@ func NewContainerRunner(dispatcherClient *arvados.Client,
 	return cr, nil
 }
 
-func main() {
-	statInterval := flag.Duration("crunchstat-interval", 10*time.Second, "sampling period for periodic resource usage reporting")
-	cgroupRoot := flag.String("cgroup-root", "/sys/fs/cgroup", "path to sysfs cgroup tree")
-	cgroupParent := flag.String("cgroup-parent", "docker", "name of container's parent cgroup (ignored if -cgroup-parent-subsystem is used)")
-	cgroupParentSubsystem := flag.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container")
-	caCertsPath := flag.String("ca-certs", "", "Path to TLS root certificates")
-	detach := flag.Bool("detach", false, "Detach from parent process and run in the background")
-	stdinEnv := flag.Bool("stdin-env", false, "Load environment variables from JSON message on stdin")
-	sleep := flag.Duration("sleep", 0, "Delay before starting (testing use only)")
-	kill := flag.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID")
-	list := flag.Bool("list", false, "List UUIDs of existing crunch-run processes")
-	enableNetwork := flag.String("container-enable-networking", "default",
+func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
+	flags := flag.NewFlagSet(prog, flag.ContinueOnError)
+	statInterval := flags.Duration("crunchstat-interval", 10*time.Second, "sampling period for periodic resource usage reporting")
+	cgroupRoot := flags.String("cgroup-root", "/sys/fs/cgroup", "path to sysfs cgroup tree")
+	cgroupParent := flags.String("cgroup-parent", "docker", "name of container's parent cgroup (ignored if -cgroup-parent-subsystem is used)")
+	cgroupParentSubsystem := flags.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container")
+	caCertsPath := flags.String("ca-certs", "", "Path to TLS root certificates")
+	detach := flags.Bool("detach", false, "Detach from parent process and run in the background")
+	stdinEnv := flags.Bool("stdin-env", false, "Load environment variables from JSON message on stdin")
+	sleep := flags.Duration("sleep", 0, "Delay before starting (testing use only)")
+	kill := flags.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID")
+	list := flags.Bool("list", false, "List UUIDs of existing crunch-run processes")
+	enableNetwork := flags.String("container-enable-networking", "default",
 		`Specify if networking should be enabled for container.  One of 'default', 'always':
     	default: only enable networking if container requests it.
     	always:  containers always have networking enabled
     	`)
-	networkMode := flag.String("container-network-mode", "default",
+	networkMode := flags.String("container-network-mode", "default",
 		`Set networking mode for container.  Corresponds to Docker network mode (--net).
     	`)
-	memprofile := flag.String("memprofile", "", "write memory profile to `file` after running container")
-	getVersion := flag.Bool("version", false, "Print version information and exit.")
-	flag.Duration("check-containerd", 0, "Ignored. Exists for compatibility with older versions.")
+	memprofile := flags.String("memprofile", "", "write memory profile to `file` after running container")
+	flags.Duration("check-containerd", 0, "Ignored. Exists for compatibility with older versions.")
 
 	ignoreDetachFlag := false
-	if len(os.Args) > 1 && os.Args[1] == "-no-detach" {
+	if len(args) > 0 && args[0] == "-no-detach" {
 		// This process was invoked by a parent process, which
 		// has passed along its own arguments, including
 		// -detach, after the leading -no-detach flag.  Strip
 		// the leading -no-detach flag (it's not recognized by
-		// flag.Parse()) and ignore the -detach flag that
+		// flags.Parse()) and ignore the -detach flag that
 		// comes later.
-		os.Args = append([]string{os.Args[0]}, os.Args[2:]...)
+		args = args[1:]
 		ignoreDetachFlag = true
 	}
 
-	flag.Parse()
+	if err := flags.Parse(args); err == flag.ErrHelp {
+		return 0
+	} else if err != nil {
+		log.Print(err)
+		return 1
+	}
 
 	if *stdinEnv && !ignoreDetachFlag {
 		// Load env vars on stdin if asked (but not in a
 		// detached child process, in which case stdin is
 		// /dev/null).
-		loadEnv(os.Stdin)
+		err := loadEnv(os.Stdin)
+		if err != nil {
+			log.Print(err)
+			return 1
+		}
 	}
 
+	containerId := flags.Arg(0)
+
 	switch {
 	case *detach && !ignoreDetachFlag:
-		os.Exit(Detach(flag.Arg(0), os.Args, os.Stdout, os.Stderr))
+		return Detach(containerId, prog, args, os.Stdout, os.Stderr)
 	case *kill >= 0:
-		os.Exit(KillProcess(flag.Arg(0), syscall.Signal(*kill), os.Stdout, os.Stderr))
+		return KillProcess(containerId, syscall.Signal(*kill), os.Stdout, os.Stderr)
 	case *list:
-		os.Exit(ListProcesses(os.Stdout, os.Stderr))
+		return ListProcesses(os.Stdout, os.Stderr)
 	}
 
-	// Print version information if requested
-	if *getVersion {
-		fmt.Printf("crunch-run %s\n", version)
-		return
+	if containerId == "" {
+		log.Printf("usage: %s [options] UUID", prog)
+		return 1
 	}
 
-	log.Printf("crunch-run %s started", version)
+	log.Printf("crunch-run %s started", cmd.Version.String())
 	time.Sleep(*sleep)
 
-	containerId := flag.Arg(0)
-
 	if *caCertsPath != "" {
 		arvadosclient.CertFiles = []string{*caCertsPath}
 	}
 
 	api, err := arvadosclient.MakeArvadosClient()
 	if err != nil {
-		log.Fatalf("%s: %v", containerId, err)
+		log.Printf("%s: %v", containerId, err)
+		return 1
 	}
 	api.Retries = 8
 
 	kc, kcerr := keepclient.MakeKeepClient(api)
 	if kcerr != nil {
-		log.Fatalf("%s: %v", containerId, kcerr)
+		log.Printf("%s: %v", containerId, kcerr)
+		return 1
 	}
 	kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2}
 	kc.Retries = 4
@@ -1845,18 +1858,20 @@ func main() {
 
 	cr, err := NewContainerRunner(arvados.NewClientFromEnv(), api, kc, docker, containerId)
 	if err != nil {
-		log.Fatal(err)
+		log.Print(err)
+		return 1
 	}
 	if dockererr != nil {
 		cr.CrunchLog.Printf("%s: %v", containerId, dockererr)
 		cr.checkBrokenNode(dockererr)
 		cr.CrunchLog.Close()
-		os.Exit(1)
+		return 1
 	}
 
 	parentTemp, tmperr := cr.MkTempDir("", "crunch-run."+containerId+".")
 	if tmperr != nil {
-		log.Fatalf("%s: %v", containerId, tmperr)
+		log.Printf("%s: %v", containerId, tmperr)
+		return 1
 	}
 
 	cr.parentTemp = parentTemp
@@ -1889,24 +1904,27 @@ func main() {
 	}
 
 	if runerr != nil {
-		log.Fatalf("%s: %v", containerId, runerr)
+		log.Printf("%s: %v", containerId, runerr)
+		return 1
 	}
+	return 0
 }
 
-func loadEnv(rdr io.Reader) {
+func loadEnv(rdr io.Reader) error {
 	buf, err := ioutil.ReadAll(rdr)
 	if err != nil {
-		log.Fatalf("read stdin: %s", err)
+		return fmt.Errorf("read stdin: %s", err)
 	}
 	var env map[string]string
 	err = json.Unmarshal(buf, &env)
 	if err != nil {
-		log.Fatalf("decode stdin: %s", err)
+		return fmt.Errorf("decode stdin: %s", err)
 	}
 	for k, v := range env {
 		err = os.Setenv(k, v)
 		if err != nil {
-			log.Fatalf("setenv(%q): %s", k, err)
+			return fmt.Errorf("setenv(%q): %s", k, err)
 		}
 	}
+	return nil
 }
diff --git a/services/crunch-run/crunchrun_test.go b/lib/crunchrun/crunchrun_test.go
similarity index 99%
rename from services/crunch-run/crunchrun_test.go
rename to lib/crunchrun/crunchrun_test.go
index 636926a8d..e8c7660d1 100644
--- a/services/crunch-run/crunchrun_test.go
+++ b/lib/crunchrun/crunchrun_test.go
@@ -2,7 +2,7 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package crunchrun
 
 import (
 	"bufio"
diff --git a/services/crunch-run/git_mount.go b/lib/crunchrun/git_mount.go
similarity index 99%
rename from services/crunch-run/git_mount.go
rename to lib/crunchrun/git_mount.go
index 4fcba1379..92bb6d11d 100644
--- a/services/crunch-run/git_mount.go
+++ b/lib/crunchrun/git_mount.go
@@ -2,7 +2,7 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package crunchrun
 
 import (
 	"fmt"
diff --git a/services/crunch-run/git_mount_test.go b/lib/crunchrun/git_mount_test.go
similarity index 99%
rename from services/crunch-run/git_mount_test.go
rename to lib/crunchrun/git_mount_test.go
index 608917808..e39beaa94 100644
--- a/services/crunch-run/git_mount_test.go
+++ b/lib/crunchrun/git_mount_test.go
@@ -2,7 +2,7 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package crunchrun
 
 import (
 	"io/ioutil"
diff --git a/services/crunch-run/logging.go b/lib/crunchrun/logging.go
similarity index 99%
rename from services/crunch-run/logging.go
rename to lib/crunchrun/logging.go
index 01cab69cd..d5de184e5 100644
--- a/services/crunch-run/logging.go
+++ b/lib/crunchrun/logging.go
@@ -2,7 +2,7 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package crunchrun
 
 import (
 	"bufio"
diff --git a/services/crunch-run/logging_test.go b/lib/crunchrun/logging_test.go
similarity index 99%
rename from services/crunch-run/logging_test.go
rename to lib/crunchrun/logging_test.go
index 4a2944f64..fab333b43 100644
--- a/services/crunch-run/logging_test.go
+++ b/lib/crunchrun/logging_test.go
@@ -2,7 +2,7 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package crunchrun
 
 import (
 	"fmt"

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list