[ARVADOS] updated: 1.3.0-178-g1d80809a1
Git user
git at public.curoverse.com
Thu Jan 17 12:02:24 EST 2019
Summary of changes:
lib/dispatchcloud/dispatcher.go | 1 +
lib/dispatchcloud/ssh_executor/executor.go | 26 ++++++++++--
lib/dispatchcloud/ssh_executor/executor_test.go | 36 +++++++++++++++++
lib/dispatchcloud/worker/pool_test.go | 54 ++++++++++++++++++-------
sdk/go/arvados/config.go | 7 +++-
5 files changed, 106 insertions(+), 18 deletions(-)
via 1d80809a16fc97d7351824d2c921578133a93f65 (commit)
via a8db9566d6375d92b606b9ca59dfd92f22c41866 (commit)
from df9d7f8d64a2a51dea5552641f44afd3e43c636b (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 1d80809a16fc97d7351824d2c921578133a93f65
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Thu Jan 17 12:01:15 2019 -0500
14325: Configurable SSH target port for cloud VMs.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/lib/dispatchcloud/dispatcher.go b/lib/dispatchcloud/dispatcher.go
index 788463330..ae954a2db 100644
--- a/lib/dispatchcloud/dispatcher.go
+++ b/lib/dispatchcloud/dispatcher.go
@@ -89,6 +89,7 @@ func (disp *dispatcher) Close() {
// Make a worker.Executor for the given instance.
func (disp *dispatcher) newExecutor(inst cloud.Instance) worker.Executor {
exr := ssh_executor.New(inst)
+ exr.SetTargetPort(disp.Cluster.CloudVMs.SSHPort)
exr.SetSigners(disp.sshKey)
return exr
}
diff --git a/lib/dispatchcloud/ssh_executor/executor.go b/lib/dispatchcloud/ssh_executor/executor.go
index 4b2478e94..d0fb54c54 100644
--- a/lib/dispatchcloud/ssh_executor/executor.go
+++ b/lib/dispatchcloud/ssh_executor/executor.go
@@ -36,9 +36,10 @@ func New(t cloud.ExecutorTarget) *Executor {
//
// An Executor must not be copied.
type Executor struct {
- target cloud.ExecutorTarget
- signers []ssh.Signer
- mtx sync.RWMutex // controls access to instance after creation
+ target cloud.ExecutorTarget
+ targetPort string
+ signers []ssh.Signer
+ mtx sync.RWMutex // controls access to instance after creation
client *ssh.Client
clientErr error
@@ -67,6 +68,17 @@ func (exr *Executor) SetTarget(t cloud.ExecutorTarget) {
exr.target = t
}
+// SetTargetPort sets the default port (name or number) to connect
+// to. This is used only when the address returned by the target's
+// Address() method does not specify a port. If the given port is
+// empty (or SetTargetPort is not called at all), the default port is
+// "ssh".
+func (exr *Executor) SetTargetPort(port string) {
+ exr.mtx.Lock()
+ defer exr.mtx.Unlock()
+ exr.targetPort = port
+}
+
// Target returns the current target.
func (exr *Executor) Target() cloud.ExecutorTarget {
exr.mtx.RLock()
@@ -167,6 +179,14 @@ func (exr *Executor) setupSSHClient() (*ssh.Client, error) {
if addr == "" {
return nil, errors.New("instance has no address")
}
+ if h, p, err := net.SplitHostPort(addr); err != nil || p == "" {
+ // Target address does not specify a port. Use
+ // targetPort, or "ssh".
+ if p = exr.targetPort; p == "" {
+ p = "ssh"
+ }
+ addr = net.JoinHostPort(h, p)
+ }
var receivedKey ssh.PublicKey
client, err := ssh.Dial("tcp", addr, &ssh.ClientConfig{
User: "root",
diff --git a/lib/dispatchcloud/ssh_executor/executor_test.go b/lib/dispatchcloud/ssh_executor/executor_test.go
index 619e47383..526840b13 100644
--- a/lib/dispatchcloud/ssh_executor/executor_test.go
+++ b/lib/dispatchcloud/ssh_executor/executor_test.go
@@ -8,6 +8,7 @@ import (
"bytes"
"io"
"io/ioutil"
+ "net"
"sync"
"testing"
"time"
@@ -32,6 +33,25 @@ func (*testTarget) VerifyHostKey(ssh.PublicKey, *ssh.Client) error {
return nil
}
+// Address returns the wrapped SSHService's host, with the port
+// stripped. This ensures the executor won't work until
+// SetTargetPort() is called -- see (*testTarget)Port().
+func (tt *testTarget) Address() string {
+ h, _, err := net.SplitHostPort(tt.SSHService.Address())
+ if err != nil {
+ panic(err)
+ }
+ return h
+}
+
+func (tt *testTarget) Port() string {
+ _, p, err := net.SplitHostPort(tt.SSHService.Address())
+ if err != nil {
+ panic(err)
+ }
+ return p
+}
+
type ExecutorSuite struct{}
func (s *ExecutorSuite) TestExecute(c *check.C) {
@@ -77,6 +97,22 @@ func (s *ExecutorSuite) TestExecute(c *check.C) {
exr := New(srv)
exr.SetSigners(clientpriv)
+ // Use the default target port (ssh). Execute will
+ // return a connection error or an authentication
+ // error, depending on whether the test host is
+ // running an SSH server.
+ _, _, err = exr.Execute(nil, command, nil)
+ c.Check(err, check.ErrorMatches, `.*(unable to authenticate|connection refused).*`)
+
+ // Use a bogus target port. Execute will return a
+ // connection error.
+ exr.SetTargetPort("0")
+ _, _, err = exr.Execute(nil, command, nil)
+ c.Check(err, check.ErrorMatches, `.*connection refused.*`)
+
+ // Use the test server's listening port.
+ exr.SetTargetPort(srv.Port())
+
done := make(chan bool)
go func() {
stdout, stderr, err := exr.Execute(map[string]string{"TESTVAR": "test value"}, command, bytes.NewBufferString(stdinData))
diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go
index bfa86abf6..1154f922b 100644
--- a/sdk/go/arvados/config.go
+++ b/sdk/go/arvados/config.go
@@ -121,7 +121,12 @@ type CloudVMs struct {
// and ready to run containers, e.g., "mount | grep
// /encrypted-tmp"
BootProbeCommand string
- SyncInterval Duration
+
+ // Listening port (name or number) of SSH servers on worker
+ // VMs
+ SSHPort string
+
+ SyncInterval Duration
// Maximum idle time before automatic shutdown
TimeoutIdle Duration
commit a8db9566d6375d92b606b9ca59dfd92f22c41866
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Thu Jan 17 10:14:33 2019 -0500
14325: More testing for hold/drain states.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/lib/dispatchcloud/worker/pool_test.go b/lib/dispatchcloud/worker/pool_test.go
index 7e84613eb..9c4542ea6 100644
--- a/lib/dispatchcloud/worker/pool_test.go
+++ b/lib/dispatchcloud/worker/pool_test.go
@@ -86,16 +86,19 @@ func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
})
// Place type3 node on admin-hold
- for _, instv := range pool.Instances() {
- if instv.ArvadosInstanceType == type3.Name {
- pool.SetIdleBehavior(instv.Instance, IdleBehaviorHold)
- break
- }
- }
+ ivs := suite.instancesByType(pool, type3)
+ c.Assert(ivs, check.HasLen, 1)
+ type3instanceID := ivs[0].Instance
+ err := pool.SetIdleBehavior(type3instanceID, IdleBehaviorHold)
+ c.Check(err, check.IsNil)
+
+ // Check admin-hold behavior: refuse to shutdown, and don't
+ // report as Unallocated ("available now or soon").
c.Check(pool.Shutdown(type3), check.Equals, false)
suite.wait(c, pool, notify, func() bool {
return pool.Unallocated()[type3] == 0
})
+ c.Check(suite.instancesByType(pool, type3), check.HasLen, 1)
// Shutdown both type2 nodes
c.Check(pool.Shutdown(type2), check.Equals, true)
@@ -129,25 +132,48 @@ func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
c.Error("notify did not receive")
}
- // Place type3 node on admin-drain so it shuts down right away
- for _, instv := range pool.Instances() {
- if instv.ArvadosInstanceType == type3.Name {
- pool.SetIdleBehavior(instv.Instance, IdleBehaviorDrain)
- break
- }
- }
+ // Put type3 node back in service.
+ err = pool.SetIdleBehavior(type3instanceID, IdleBehaviorRun)
+ c.Check(err, check.IsNil)
+ suite.wait(c, pool, notify, func() bool {
+ return pool.Unallocated()[type3] == 1
+ })
+
+ // Check admin-drain behavior: shut down right away, and don't
+ // report as Unallocated.
+ err = pool.SetIdleBehavior(type3instanceID, IdleBehaviorDrain)
+ c.Check(err, check.IsNil)
suite.wait(c, pool, notify, func() bool {
return pool.Unallocated()[type3] == 0
})
+ suite.wait(c, pool, notify, func() bool {
+ ivs := suite.instancesByType(pool, type3)
+ return len(ivs) == 1 && ivs[0].WorkerState == StateShutdown.String()
+ })
- go lameInstanceSet.Release(4) // unblock Destroy calls
+ // Unblock all pending Destroy calls. Pool calls Destroy again
+ // if a node still appears in the provider list after a
+ // previous attempt, so there might be more than 4 Destroy
+ // calls to unblock.
+ go lameInstanceSet.Release(4444)
+ // Sync until all instances disappear from the provider list.
suite.wait(c, pool, notify, func() bool {
pool.getInstancesAndSync()
return len(pool.Instances()) == 0
})
}
+func (suite *PoolSuite) instancesByType(pool *Pool, it arvados.InstanceType) []InstanceView {
+ var ivs []InstanceView
+ for _, iv := range pool.Instances() {
+ if iv.ArvadosInstanceType == it.Name {
+ ivs = append(ivs, iv)
+ }
+ }
+ return ivs
+}
+
func (suite *PoolSuite) wait(c *check.C, pool *Pool, notify <-chan struct{}, ready func() bool) {
timeout := time.NewTimer(time.Second).C
for !ready() {
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list