[ARVADOS] created: 1.3.0-138-g54643dadc
Git user
git at public.curoverse.com
Fri Dec 21 15:40:01 EST 2018
at 54643dadce1988a815b85a52349cb8a33cbdbf65 (commit)
commit 54643dadce1988a815b85a52349cb8a33cbdbf65
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Thu Dec 20 16:09:32 2018 -0500
14325: Add worker state diagram.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/lib/dispatchcloud/readme_states.txt b/lib/dispatchcloud/readme_states.txt
new file mode 100644
index 000000000..c78df3bce
--- /dev/null
+++ b/lib/dispatchcloud/readme_states.txt
@@ -0,0 +1,27 @@
+# cpan -I -T install Graph::Easy
+# (eval `perl -I ~/perl5/lib/perl5 -Mlocal::lib`; cpan -T install Graph::Easy)
+# graph-easy --as=svg < readme_states.txt
+
+[Nonexistent] - appears in cloud list -> [Unknown]
+[Nonexistent] - create() returns ID -> [Booting]
+[Unknown] - create() returns ID -> [Booting]
+[Unknown] - boot timeout -> [Shutdown]
+[Booting] - boot+run probes succeed -> [Idle]
+[Idle] - idle timeout -> [Shutdown]
+[Idle] - probe timeout -> [Shutdown]
+[Idle] - want=drain -> [Shutdown]
+[Idle] - container starts -> [Running]
+[Running] - container ends -> [Idle]
+[Running] - container ends, want=drain -> [Shutdown]
+[Shutdown] - instance disappears from cloud -> [Gone]
+
+# Layouter fails if we add these
+#[Hold] - want=run -> [Booting]
+#[Hold] - want=drain -> [Shutdown]
+#[Running] - container ends, want=hold -> [Hold]
+#[Unknown] - want=hold -> [Hold]
+#[Booting] - want=hold -> [Hold]
+#[Idle] - want=hold -> [Hold]
+
+# Not worth saying?
+#[Booting] - boot probe succeeds, run probe fails -> [Booting]
commit de12a680c6cd6395a700aca5d03604465afa3df5
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Tue Dec 18 15:50:17 2018 -0500
14325: Propagate API env vars to crunch-run.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/lib/dispatchcloud/dispatcher.go b/lib/dispatchcloud/dispatcher.go
index 81ad0ed3f..2b7fd502f 100644
--- a/lib/dispatchcloud/dispatcher.go
+++ b/lib/dispatchcloud/dispatcher.go
@@ -127,7 +127,7 @@ func (disp *dispatcher) initialize() {
}
disp.instanceSet = &instanceSetProxy{instanceSet}
disp.reg = prometheus.NewRegistry()
- disp.pool = worker.NewPool(disp.logger, disp.reg, disp.instanceSet, disp.newExecutor, disp.Cluster)
+ disp.pool = worker.NewPool(disp.logger, arvClient, disp.reg, disp.instanceSet, disp.newExecutor, disp.Cluster)
disp.queue = container.NewQueue(disp.logger, disp.reg, disp.typeChooser, arvClient)
if disp.Cluster.ManagementToken == "" {
diff --git a/lib/dispatchcloud/ssh_executor/executor.go b/lib/dispatchcloud/ssh_executor/executor.go
index b5dba9870..4b2478e94 100644
--- a/lib/dispatchcloud/ssh_executor/executor.go
+++ b/lib/dispatchcloud/ssh_executor/executor.go
@@ -76,12 +76,18 @@ func (exr *Executor) Target() cloud.ExecutorTarget {
// Execute runs cmd on the target. If an existing connection is not
// usable, it sets up a new connection to the current target.
-func (exr *Executor) Execute(cmd string, stdin io.Reader) ([]byte, []byte, error) {
+func (exr *Executor) Execute(env map[string]string, cmd string, stdin io.Reader) ([]byte, []byte, error) {
session, err := exr.newSession()
if err != nil {
return nil, nil, err
}
defer session.Close()
+ for k, v := range env {
+ err = session.Setenv(k, v)
+ if err != nil {
+ return nil, nil, err
+ }
+ }
var stdout, stderr bytes.Buffer
session.Stdin = stdin
session.Stdout = &stdout
diff --git a/lib/dispatchcloud/ssh_executor/executor_test.go b/lib/dispatchcloud/ssh_executor/executor_test.go
index 8dabfecad..619e47383 100644
--- a/lib/dispatchcloud/ssh_executor/executor_test.go
+++ b/lib/dispatchcloud/ssh_executor/executor_test.go
@@ -42,7 +42,8 @@ func (s *ExecutorSuite) TestExecute(c *check.C) {
for _, exitcode := range []int{0, 1, 2} {
srv := &testTarget{
SSHService: test.SSHService{
- Exec: func(cmd string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
+ Exec: func(env map[string]string, cmd string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
+ c.Check(env["TESTVAR"], check.Equals, "test value")
c.Check(cmd, check.Equals, command)
var wg sync.WaitGroup
wg.Add(2)
@@ -78,7 +79,7 @@ func (s *ExecutorSuite) TestExecute(c *check.C) {
done := make(chan bool)
go func() {
- stdout, stderr, err := exr.Execute(command, bytes.NewBufferString(stdinData))
+ stdout, stderr, err := exr.Execute(map[string]string{"TESTVAR": "test value"}, command, bytes.NewBufferString(stdinData))
if exitcode == 0 {
c.Check(err, check.IsNil)
} else {
diff --git a/lib/dispatchcloud/test/ssh_service.go b/lib/dispatchcloud/test/ssh_service.go
index b1e4e03b1..ed5995f4c 100644
--- a/lib/dispatchcloud/test/ssh_service.go
+++ b/lib/dispatchcloud/test/ssh_service.go
@@ -32,7 +32,7 @@ func LoadTestKey(c *check.C, fnm string) (ssh.PublicKey, ssh.Signer) {
// An SSHExecFunc handles an "exec" session on a multiplexed SSH
// connection.
-type SSHExecFunc func(command string, stdin io.Reader, stdout, stderr io.Writer) uint32
+type SSHExecFunc func(env map[string]string, command string, stdin io.Reader, stdout, stderr io.Writer) uint32
// An SSHService accepts SSH connections on an available TCP port and
// passes clients' "exec" sessions to the provided SSHExecFunc.
@@ -146,22 +146,37 @@ func (ss *SSHService) serveConn(nConn net.Conn, config *ssh.ServerConfig) {
log.Printf("accept channel: %s", err)
return
}
- var execReq struct {
- Command string
- }
+ didExec := false
+ sessionEnv := map[string]string{}
go func() {
for req := range reqs {
- if req.Type == "exec" && execReq.Command == "" {
+ switch {
+ case didExec:
+ // Reject anything after exec
+ req.Reply(false, nil)
+ case req.Type == "exec":
+ var execReq struct {
+ Command string
+ }
req.Reply(true, nil)
ssh.Unmarshal(req.Payload, &execReq)
go func() {
var resp struct {
Status uint32
}
- resp.Status = ss.Exec(execReq.Command, ch, ch, ch.Stderr())
+ resp.Status = ss.Exec(sessionEnv, execReq.Command, ch, ch, ch.Stderr())
ch.SendRequest("exit-status", false, ssh.Marshal(&resp))
ch.Close()
}()
+ didExec = true
+ case req.Type == "env":
+ var envReq struct {
+ Name string
+ Value string
+ }
+ req.Reply(true, nil)
+ ssh.Unmarshal(req.Payload, &envReq)
+ sessionEnv[envReq.Name] = envReq.Value
}
}
}()
diff --git a/lib/dispatchcloud/test/stub_driver.go b/lib/dispatchcloud/test/stub_driver.go
index 8bdfaa947..136286b3d 100644
--- a/lib/dispatchcloud/test/stub_driver.go
+++ b/lib/dispatchcloud/test/stub_driver.go
@@ -155,7 +155,7 @@ func (svm *StubVM) Instance() stubInstance {
}
}
-func (svm *StubVM) Exec(command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
+func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
queue := svm.sis.driver.Queue
uuid := regexp.MustCompile(`.{5}-dz642-.{15}`).FindString(command)
if eta := svm.Boot.Sub(time.Now()); eta > 0 {
@@ -171,6 +171,12 @@ func (svm *StubVM) Exec(command string, stdin io.Reader, stdout, stderr io.Write
return 1
}
if strings.HasPrefix(command, "crunch-run --detach ") {
+ for _, name := range []string{"ARVADOS_API_HOST", "ARVADOS_API_TOKEN"} {
+ if env[name] == "" {
+ fmt.Fprintf(stderr, "%s missing from environment %q\n", name, env)
+ return 1
+ }
+ }
svm.Lock()
if svm.running == nil {
svm.running = map[string]bool{}
diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go
index ff5f762c1..50e0c066a 100644
--- a/lib/dispatchcloud/worker/pool.go
+++ b/lib/dispatchcloud/worker/pool.go
@@ -36,7 +36,7 @@ type InstanceView struct {
// An Executor executes shell commands on a remote host.
type Executor interface {
// Run cmd on the current target.
- Execute(cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
+ Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
// Use the given target for subsequent operations. The new
// target is the same host as the previous target, but it
@@ -75,9 +75,10 @@ func duration(conf arvados.Duration, def time.Duration) time.Duration {
//
// New instances are configured and set up according to the given
// cluster configuration.
-func NewPool(logger logrus.FieldLogger, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool {
+func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool {
wp := &Pool{
logger: logger,
+ arvClient: arvClient,
instanceSet: instanceSet,
newExecutor: newExecutor,
bootProbeCommand: cluster.CloudVMs.BootProbeCommand,
@@ -107,6 +108,7 @@ func NewPool(logger logrus.FieldLogger, reg *prometheus.Registry, instanceSet cl
type Pool struct {
// configuration
logger logrus.FieldLogger
+ arvClient *arvados.Client
instanceSet cloud.InstanceSet
newExecutor func(cloud.Instance) Executor
bootProbeCommand string
@@ -411,7 +413,7 @@ func (wp *Pool) kill(wkr *worker, uuid string) {
"Instance": wkr.instance,
})
logger.Debug("killing process")
- stdout, stderr, err := wkr.executor.Execute("crunch-run --kill 15 "+uuid, nil)
+ stdout, stderr, err := wkr.executor.Execute(nil, "crunch-run --kill 15 "+uuid, nil)
if err != nil {
logger.WithFields(logrus.Fields{
"stderr": string(stderr),
diff --git a/lib/dispatchcloud/worker/pool_test.go b/lib/dispatchcloud/worker/pool_test.go
index 3867e2c63..7a25db4a0 100644
--- a/lib/dispatchcloud/worker/pool_test.go
+++ b/lib/dispatchcloud/worker/pool_test.go
@@ -128,7 +128,7 @@ type stubExecutor struct{}
func (*stubExecutor) SetTarget(cloud.ExecutorTarget) {}
-func (*stubExecutor) Execute(cmd string, stdin io.Reader) ([]byte, []byte, error) {
+func (*stubExecutor) Execute(env map[string]string, cmd string, stdin io.Reader) ([]byte, []byte, error) {
return nil, nil, nil
}
diff --git a/lib/dispatchcloud/worker/worker.go b/lib/dispatchcloud/worker/worker.go
index c26186309..c15a9f9b0 100644
--- a/lib/dispatchcloud/worker/worker.go
+++ b/lib/dispatchcloud/worker/worker.go
@@ -86,7 +86,11 @@ func (wkr *worker) startContainer(ctr arvados.Container) {
wkr.starting[ctr.UUID] = struct{}{}
wkr.state = StateRunning
go func() {
- stdout, stderr, err := wkr.executor.Execute("crunch-run --detach '"+ctr.UUID+"'", nil)
+ env := map[string]string{
+ "ARVADOS_API_HOST": wkr.wp.arvClient.APIHost,
+ "ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
+ }
+ stdout, stderr, err := wkr.executor.Execute(env, "crunch-run --detach '"+ctr.UUID+"'", nil)
wkr.mtx.Lock()
defer wkr.mtx.Unlock()
now := time.Now()
@@ -234,7 +238,7 @@ func (wkr *worker) probeAndUpdate() {
func (wkr *worker) probeRunning() (running []string, ok bool, stderr []byte) {
cmd := "crunch-run --list"
- stdout, stderr, err := wkr.executor.Execute(cmd, nil)
+ stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
if err != nil {
wkr.logger.WithFields(logrus.Fields{
"Command": cmd,
@@ -255,7 +259,7 @@ func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
if cmd == "" {
cmd = "true"
}
- stdout, stderr, err := wkr.executor.Execute(cmd, nil)
+ stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
logger := wkr.logger.WithFields(logrus.Fields{
"Command": cmd,
"stdout": string(stdout),
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list