[ARVADOS] created: 1.3.0-138-g54643dadc

Git user git at public.curoverse.com
Fri Dec 21 15:40:01 EST 2018


        at  54643dadce1988a815b85a52349cb8a33cbdbf65 (commit)


commit 54643dadce1988a815b85a52349cb8a33cbdbf65
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Thu Dec 20 16:09:32 2018 -0500

    14325: Add worker state diagram.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/lib/dispatchcloud/readme_states.txt b/lib/dispatchcloud/readme_states.txt
new file mode 100644
index 000000000..c78df3bce
--- /dev/null
+++ b/lib/dispatchcloud/readme_states.txt
@@ -0,0 +1,27 @@
+# cpan -I -T install Graph::Easy
+# (eval `perl -I ~/perl5/lib/perl5 -Mlocal::lib`; cpan -T install Graph::Easy)
+# graph-easy --as=svg < readme_states.txt
+
+[Nonexistent] - appears in cloud list -> [Unknown]
+[Nonexistent] - create() returns ID -> [Booting]
+[Unknown] - create() returns ID -> [Booting]
+[Unknown] - boot timeout -> [Shutdown]
+[Booting] - boot+run probes succeed -> [Idle]
+[Idle] - idle timeout -> [Shutdown]
+[Idle] - probe timeout -> [Shutdown]
+[Idle] - want=drain -> [Shutdown]
+[Idle] - container starts -> [Running]
+[Running] - container ends -> [Idle]
+[Running] - container ends, want=drain -> [Shutdown]
+[Shutdown] - instance disappears from cloud -> [Gone]
+
+# Layouter fails if we add these
+#[Hold] - want=run -> [Booting]
+#[Hold] - want=drain -> [Shutdown]
+#[Running] - container ends, want=hold -> [Hold]
+#[Unknown] - want=hold -> [Hold]
+#[Booting] - want=hold -> [Hold]
+#[Idle] - want=hold -> [Hold]
+
+# Not worth saying?
+#[Booting] - boot probe succeeds, run probe fails -> [Booting]

commit de12a680c6cd6395a700aca5d03604465afa3df5
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Tue Dec 18 15:50:17 2018 -0500

    14325: Propagate API env vars to crunch-run.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/lib/dispatchcloud/dispatcher.go b/lib/dispatchcloud/dispatcher.go
index 81ad0ed3f..2b7fd502f 100644
--- a/lib/dispatchcloud/dispatcher.go
+++ b/lib/dispatchcloud/dispatcher.go
@@ -127,7 +127,7 @@ func (disp *dispatcher) initialize() {
 	}
 	disp.instanceSet = &instanceSetProxy{instanceSet}
 	disp.reg = prometheus.NewRegistry()
-	disp.pool = worker.NewPool(disp.logger, disp.reg, disp.instanceSet, disp.newExecutor, disp.Cluster)
+	disp.pool = worker.NewPool(disp.logger, arvClient, disp.reg, disp.instanceSet, disp.newExecutor, disp.Cluster)
 	disp.queue = container.NewQueue(disp.logger, disp.reg, disp.typeChooser, arvClient)
 
 	if disp.Cluster.ManagementToken == "" {
diff --git a/lib/dispatchcloud/ssh_executor/executor.go b/lib/dispatchcloud/ssh_executor/executor.go
index b5dba9870..4b2478e94 100644
--- a/lib/dispatchcloud/ssh_executor/executor.go
+++ b/lib/dispatchcloud/ssh_executor/executor.go
@@ -76,12 +76,18 @@ func (exr *Executor) Target() cloud.ExecutorTarget {
 
 // Execute runs cmd on the target. If an existing connection is not
 // usable, it sets up a new connection to the current target.
-func (exr *Executor) Execute(cmd string, stdin io.Reader) ([]byte, []byte, error) {
+func (exr *Executor) Execute(env map[string]string, cmd string, stdin io.Reader) ([]byte, []byte, error) {
 	session, err := exr.newSession()
 	if err != nil {
 		return nil, nil, err
 	}
 	defer session.Close()
+	for k, v := range env {
+		err = session.Setenv(k, v)
+		if err != nil {
+			return nil, nil, err
+		}
+	}
 	var stdout, stderr bytes.Buffer
 	session.Stdin = stdin
 	session.Stdout = &stdout
diff --git a/lib/dispatchcloud/ssh_executor/executor_test.go b/lib/dispatchcloud/ssh_executor/executor_test.go
index 8dabfecad..619e47383 100644
--- a/lib/dispatchcloud/ssh_executor/executor_test.go
+++ b/lib/dispatchcloud/ssh_executor/executor_test.go
@@ -42,7 +42,8 @@ func (s *ExecutorSuite) TestExecute(c *check.C) {
 	for _, exitcode := range []int{0, 1, 2} {
 		srv := &testTarget{
 			SSHService: test.SSHService{
-				Exec: func(cmd string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
+				Exec: func(env map[string]string, cmd string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
+					c.Check(env["TESTVAR"], check.Equals, "test value")
 					c.Check(cmd, check.Equals, command)
 					var wg sync.WaitGroup
 					wg.Add(2)
@@ -78,7 +79,7 @@ func (s *ExecutorSuite) TestExecute(c *check.C) {
 
 		done := make(chan bool)
 		go func() {
-			stdout, stderr, err := exr.Execute(command, bytes.NewBufferString(stdinData))
+			stdout, stderr, err := exr.Execute(map[string]string{"TESTVAR": "test value"}, command, bytes.NewBufferString(stdinData))
 			if exitcode == 0 {
 				c.Check(err, check.IsNil)
 			} else {
diff --git a/lib/dispatchcloud/test/ssh_service.go b/lib/dispatchcloud/test/ssh_service.go
index b1e4e03b1..ed5995f4c 100644
--- a/lib/dispatchcloud/test/ssh_service.go
+++ b/lib/dispatchcloud/test/ssh_service.go
@@ -32,7 +32,7 @@ func LoadTestKey(c *check.C, fnm string) (ssh.PublicKey, ssh.Signer) {
 
 // An SSHExecFunc handles an "exec" session on a multiplexed SSH
 // connection.
-type SSHExecFunc func(command string, stdin io.Reader, stdout, stderr io.Writer) uint32
+type SSHExecFunc func(env map[string]string, command string, stdin io.Reader, stdout, stderr io.Writer) uint32
 
 // An SSHService accepts SSH connections on an available TCP port and
 // passes clients' "exec" sessions to the provided SSHExecFunc.
@@ -146,22 +146,37 @@ func (ss *SSHService) serveConn(nConn net.Conn, config *ssh.ServerConfig) {
 			log.Printf("accept channel: %s", err)
 			return
 		}
-		var execReq struct {
-			Command string
-		}
+		didExec := false
+		sessionEnv := map[string]string{}
 		go func() {
 			for req := range reqs {
-				if req.Type == "exec" && execReq.Command == "" {
+				switch {
+				case didExec:
+					// Reject anything after exec
+					req.Reply(false, nil)
+				case req.Type == "exec":
+					var execReq struct {
+						Command string
+					}
 					req.Reply(true, nil)
 					ssh.Unmarshal(req.Payload, &execReq)
 					go func() {
 						var resp struct {
 							Status uint32
 						}
-						resp.Status = ss.Exec(execReq.Command, ch, ch, ch.Stderr())
+						resp.Status = ss.Exec(sessionEnv, execReq.Command, ch, ch, ch.Stderr())
 						ch.SendRequest("exit-status", false, ssh.Marshal(&resp))
 						ch.Close()
 					}()
+					didExec = true
+				case req.Type == "env":
+					var envReq struct {
+						Name  string
+						Value string
+					}
+					req.Reply(true, nil)
+					ssh.Unmarshal(req.Payload, &envReq)
+					sessionEnv[envReq.Name] = envReq.Value
 				}
 			}
 		}()
diff --git a/lib/dispatchcloud/test/stub_driver.go b/lib/dispatchcloud/test/stub_driver.go
index 8bdfaa947..136286b3d 100644
--- a/lib/dispatchcloud/test/stub_driver.go
+++ b/lib/dispatchcloud/test/stub_driver.go
@@ -155,7 +155,7 @@ func (svm *StubVM) Instance() stubInstance {
 	}
 }
 
-func (svm *StubVM) Exec(command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
+func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
 	queue := svm.sis.driver.Queue
 	uuid := regexp.MustCompile(`.{5}-dz642-.{15}`).FindString(command)
 	if eta := svm.Boot.Sub(time.Now()); eta > 0 {
@@ -171,6 +171,12 @@ func (svm *StubVM) Exec(command string, stdin io.Reader, stdout, stderr io.Write
 		return 1
 	}
 	if strings.HasPrefix(command, "crunch-run --detach ") {
+		for _, name := range []string{"ARVADOS_API_HOST", "ARVADOS_API_TOKEN"} {
+			if env[name] == "" {
+				fmt.Fprintf(stderr, "%s missing from environment %q\n", name, env)
+				return 1
+			}
+		}
 		svm.Lock()
 		if svm.running == nil {
 			svm.running = map[string]bool{}
diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go
index ff5f762c1..50e0c066a 100644
--- a/lib/dispatchcloud/worker/pool.go
+++ b/lib/dispatchcloud/worker/pool.go
@@ -36,7 +36,7 @@ type InstanceView struct {
 // An Executor executes shell commands on a remote host.
 type Executor interface {
 	// Run cmd on the current target.
-	Execute(cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
+	Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
 
 	// Use the given target for subsequent operations. The new
 	// target is the same host as the previous target, but it
@@ -75,9 +75,10 @@ func duration(conf arvados.Duration, def time.Duration) time.Duration {
 //
 // New instances are configured and set up according to the given
 // cluster configuration.
-func NewPool(logger logrus.FieldLogger, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool {
+func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool {
 	wp := &Pool{
 		logger:             logger,
+		arvClient:          arvClient,
 		instanceSet:        instanceSet,
 		newExecutor:        newExecutor,
 		bootProbeCommand:   cluster.CloudVMs.BootProbeCommand,
@@ -107,6 +108,7 @@ func NewPool(logger logrus.FieldLogger, reg *prometheus.Registry, instanceSet cl
 type Pool struct {
 	// configuration
 	logger             logrus.FieldLogger
+	arvClient          *arvados.Client
 	instanceSet        cloud.InstanceSet
 	newExecutor        func(cloud.Instance) Executor
 	bootProbeCommand   string
@@ -411,7 +413,7 @@ func (wp *Pool) kill(wkr *worker, uuid string) {
 		"Instance":      wkr.instance,
 	})
 	logger.Debug("killing process")
-	stdout, stderr, err := wkr.executor.Execute("crunch-run --kill 15 "+uuid, nil)
+	stdout, stderr, err := wkr.executor.Execute(nil, "crunch-run --kill 15 "+uuid, nil)
 	if err != nil {
 		logger.WithFields(logrus.Fields{
 			"stderr": string(stderr),
diff --git a/lib/dispatchcloud/worker/pool_test.go b/lib/dispatchcloud/worker/pool_test.go
index 3867e2c63..7a25db4a0 100644
--- a/lib/dispatchcloud/worker/pool_test.go
+++ b/lib/dispatchcloud/worker/pool_test.go
@@ -128,7 +128,7 @@ type stubExecutor struct{}
 
 func (*stubExecutor) SetTarget(cloud.ExecutorTarget) {}
 
-func (*stubExecutor) Execute(cmd string, stdin io.Reader) ([]byte, []byte, error) {
+func (*stubExecutor) Execute(env map[string]string, cmd string, stdin io.Reader) ([]byte, []byte, error) {
 	return nil, nil, nil
 }
 
diff --git a/lib/dispatchcloud/worker/worker.go b/lib/dispatchcloud/worker/worker.go
index c26186309..c15a9f9b0 100644
--- a/lib/dispatchcloud/worker/worker.go
+++ b/lib/dispatchcloud/worker/worker.go
@@ -86,7 +86,11 @@ func (wkr *worker) startContainer(ctr arvados.Container) {
 	wkr.starting[ctr.UUID] = struct{}{}
 	wkr.state = StateRunning
 	go func() {
-		stdout, stderr, err := wkr.executor.Execute("crunch-run --detach '"+ctr.UUID+"'", nil)
+		env := map[string]string{
+			"ARVADOS_API_HOST":  wkr.wp.arvClient.APIHost,
+			"ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
+		}
+		stdout, stderr, err := wkr.executor.Execute(env, "crunch-run --detach '"+ctr.UUID+"'", nil)
 		wkr.mtx.Lock()
 		defer wkr.mtx.Unlock()
 		now := time.Now()
@@ -234,7 +238,7 @@ func (wkr *worker) probeAndUpdate() {
 
 func (wkr *worker) probeRunning() (running []string, ok bool, stderr []byte) {
 	cmd := "crunch-run --list"
-	stdout, stderr, err := wkr.executor.Execute(cmd, nil)
+	stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
 	if err != nil {
 		wkr.logger.WithFields(logrus.Fields{
 			"Command": cmd,
@@ -255,7 +259,7 @@ func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
 	if cmd == "" {
 		cmd = "true"
 	}
-	stdout, stderr, err := wkr.executor.Execute(cmd, nil)
+	stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
 	logger := wkr.logger.WithFields(logrus.Fields{
 		"Command": cmd,
 		"stdout":  string(stdout),

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list