[ARVADOS] created: 1.3.0-1124-g0684f84ec
Git user
git at public.curoverse.com
Fri Jun 21 20:05:20 UTC 2019
at 0684f84ec50069b24b4564474db67d6b44266699 (commit)
commit 0684f84ec50069b24b4564474db67d6b44266699
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Fri Jun 21 16:01:38 2019 -0400
15026: Add comments.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/lib/cloud/cloudtest/tester.go b/lib/cloud/cloudtest/tester.go
index 81b14aed8..33ff264bc 100644
--- a/lib/cloud/cloudtest/tester.go
+++ b/lib/cloud/cloudtest/tester.go
@@ -68,11 +68,17 @@ func (t *tester) Run() bool {
t.Logger.WithError(err).Info("error initializing driver")
return false
}
+
+ // Don't send the driver any filters the first time we get the
+ // instance list. This way we can log an instance count
+ // (N=...) that includes all instances in this service
+ // account, even if they don't have the same InstanceSetID.
insts, err := t.getInstances(nil)
if err != nil {
t.Logger.WithError(err).Info("error getting initial list of instances")
return false
}
+
for {
foundExisting := false
for _, i := range insts {
@@ -129,6 +135,9 @@ func (t *tester) Run() bool {
}).Info("creating instance")
inst, err := t.is.Create(t.InstanceType, t.ImageID, tags, initCommand, t.SSHKey.PublicKey())
if err != nil {
+ // Create() might have failed due to a bug or network
+ // error even though the creation was successful, so
+ // it's safer to wait a bit for an instance to appear.
deferredError = true
t.Logger.WithError(err).Error("error creating test instance")
t.Logger.WithField("Deadline", bootDeadline).Info("waiting for instance to appear anyway, in case the Create response was incorrect")
@@ -143,6 +152,8 @@ func (t *tester) Run() bool {
t.Logger.WithField("Instance", t.testInstance.ID()).Info("new instance appeared")
t.showLoginInfo()
} else {
+ // Create() succeeded. Make sure the new instance
+ // appears right away in the Instances() list.
t.Logger.WithField("Instance", inst.ID()).Info("created instance")
t.testInstance = inst
t.showLoginInfo()
@@ -182,6 +193,11 @@ func (t *tester) Run() bool {
return !deferredError
}
+// If the test instance has an address, log an "ssh user at host" command
+// line that the operator can paste into another terminal, and set
+// t.showedLoginInfo.
+//
+// If the test instance doesn't have an address yet, do nothing.
func (t *tester) showLoginInfo() {
t.updateExecutor()
host, port := t.executor.TargetHostPort()
@@ -225,6 +241,10 @@ func (t *tester) refreshTestInstance() error {
return errTestInstanceNotFound
}
+// Get the list of instances, passing the given tags to the cloud
+// driver to filter results.
+//
+// Return only the instances that have our InstanceSetID tag.
func (t *tester) getInstances(tags cloud.InstanceTags) ([]cloud.Instance, error) {
var ret []cloud.Instance
t.Logger.WithField("FilterTags", tags).Info("getting instance list")
@@ -241,6 +261,8 @@ func (t *tester) getInstances(tags cloud.InstanceTags) ([]cloud.Instance, error)
return ret, nil
}
+// Check that t.testInstance has every tag in t.Tags. If not, log an
+// error and return false.
func (t *tester) checkTags() bool {
ok := true
for k, v := range t.Tags {
@@ -259,6 +281,8 @@ func (t *tester) checkTags() bool {
return ok
}
+// Run t.BootProbeCommand on t.testInstance until it succeeds or the
+// deadline arrives.
func (t *tester) waitForBoot(deadline time.Time) bool {
for time.Now().Before(deadline) {
err := t.runShellCommand(t.BootProbeCommand)
@@ -272,6 +296,8 @@ func (t *tester) waitForBoot(deadline time.Time) bool {
return false
}
+// Create t.executor and/or update its target to t.testInstance's
+// current address.
func (t *tester) updateExecutor() {
if t.executor == nil {
t.executor = ssh_executor.New(t.testInstance)
diff --git a/lib/dispatchcloud/driver.go b/lib/dispatchcloud/driver.go
index 49437e318..8d09d6a53 100644
--- a/lib/dispatchcloud/driver.go
+++ b/lib/dispatchcloud/driver.go
@@ -17,6 +17,9 @@ import (
"golang.org/x/crypto/ssh"
)
+// Map of available cloud drivers.
+// Clusters.*.Containers.CloudVMs.Driver configuration values
+// correspond to keys in this map.
var Drivers = map[string]cloud.Driver{
"azure": azure.Driver,
"ec2": ec2.Driver,
commit 6d87ae37bbc00cbb6fffb47ade3ec824b501b3df
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Fri Jun 21 14:09:20 2019 -0400
15026: Add cloudtest command.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/build/run-tests.sh b/build/run-tests.sh
index 97c21b405..6f8f11774 100755
--- a/build/run-tests.sh
+++ b/build/run-tests.sh
@@ -80,6 +80,7 @@ lib/controller
lib/crunchstat
lib/cloud
lib/cloud/azure
+lib/cloud/cloudtest
lib/dispatchcloud
lib/dispatchcloud/container
lib/dispatchcloud/scheduler
@@ -734,7 +735,7 @@ do_test() {
services/api)
stop_services
;;
- gofmt | govendor | doc | lib/cli | lib/cloud/azure | lib/cloud/ec2 | lib/cmd | lib/dispatchcloud/ssh_executor | lib/dispatchcloud/worker)
+ gofmt | govendor | doc | lib/cli | lib/cloud/azure | lib/cloud/ec2 | lib/cloud/cloudtest | lib/cmd | lib/dispatchcloud/ssh_executor | lib/dispatchcloud/worker)
# don't care whether services are running
;;
*)
@@ -992,6 +993,7 @@ gostuff=(
lib/cloud
lib/cloud/azure
lib/cloud/ec2
+ lib/cloud/cloudtest
lib/config
lib/dispatchcloud
lib/dispatchcloud/container
diff --git a/cmd/arvados-server/cmd.go b/cmd/arvados-server/cmd.go
index 983159382..2506bd2c9 100644
--- a/cmd/arvados-server/cmd.go
+++ b/cmd/arvados-server/cmd.go
@@ -7,6 +7,7 @@ package main
import (
"os"
+ "git.curoverse.com/arvados.git/lib/cloud/cloudtest"
"git.curoverse.com/arvados.git/lib/cmd"
"git.curoverse.com/arvados.git/lib/config"
"git.curoverse.com/arvados.git/lib/controller"
@@ -20,6 +21,7 @@ var (
"-version": cmd.Version(version),
"--version": cmd.Version(version),
+ "cloudtest": cloudtest.Command,
"config-check": config.CheckCommand,
"config-dump": config.DumpCommand,
"controller": controller.Command,
diff --git a/lib/cloud/cloudtest/cmd.go b/lib/cloud/cloudtest/cmd.go
new file mode 100644
index 000000000..1f94ea6da
--- /dev/null
+++ b/lib/cloud/cloudtest/cmd.go
@@ -0,0 +1,153 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package cloudtest
+
+import (
+ "bufio"
+ "errors"
+ "flag"
+ "fmt"
+ "io"
+ "os"
+ "os/user"
+
+ "git.curoverse.com/arvados.git/lib/cloud"
+ "git.curoverse.com/arvados.git/lib/config"
+ "git.curoverse.com/arvados.git/lib/dispatchcloud"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
+ "golang.org/x/crypto/ssh"
+)
+
+var Command command
+
+type command struct{}
+
+func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
+ var err error
+ defer func() {
+ if err != nil {
+ fmt.Fprintf(stderr, "%s\n", err)
+ }
+ }()
+
+ flags := flag.NewFlagSet("", flag.ContinueOnError)
+ flags.SetOutput(stderr)
+ configFile := flags.String("config", arvados.DefaultConfigFile, "Site configuration `file`")
+ instanceSetID := flags.String("instance-set-id", defaultInstanceSetID(), "InstanceSetID tag `value` to use on the test instance")
+ imageID := flags.String("image-id", "", "Image ID to use when creating the test instance (if empty, use cluster config)")
+ instanceType := flags.String("instance-type", "", "Instance type to create (if empty, use cheapest type in config)")
+ destroyExisting := flags.Bool("destroy-existing", false, "Destroy any existing instances tagged with our InstanceSetID, instead of erroring out")
+ shellCommand := flags.String("command", "", "Run an interactive shell command on the test instance when it boots")
+ pauseBeforeDestroy := flags.Bool("pause-before-destroy", false, "Prompt and wait before destroying the test instance")
+ err = flags.Parse(args)
+ if err == flag.ErrHelp {
+ err = nil
+ return 0
+ } else if err != nil {
+ return 2
+ }
+
+ if len(flags.Args()) != 0 {
+ flags.Usage()
+ return 2
+ }
+ logger := ctxlog.New(stderr, "text", "info")
+ defer func() {
+ if err != nil {
+ logger.WithError(err).Error("fatal")
+ // suppress output from the other error-printing func
+ err = nil
+ }
+ logger.Info("exiting")
+ }()
+
+ cfg, err := config.LoadFile(*configFile, logger)
+ if err != nil {
+ return 1
+ }
+ cluster, err := cfg.GetCluster("")
+ if err != nil {
+ return 1
+ }
+ key, err := ssh.ParsePrivateKey([]byte(cluster.Containers.DispatchPrivateKey))
+ if err != nil {
+ err = fmt.Errorf("error parsing configured Containers.DispatchPrivateKey: %s", err)
+ return 1
+ }
+ driver, ok := dispatchcloud.Drivers[cluster.Containers.CloudVMs.Driver]
+ if !ok {
+ err = fmt.Errorf("unsupported cloud driver %q", cluster.Containers.CloudVMs.Driver)
+ return 1
+ }
+ if *imageID == "" {
+ *imageID = cluster.Containers.CloudVMs.ImageID
+ }
+ it, err := chooseInstanceType(cluster, *instanceType)
+ if err != nil {
+ return 1
+ }
+ tags := cloud.SharedResourceTags(cluster.Containers.CloudVMs.ResourceTags)
+ tagKeyPrefix := cluster.Containers.CloudVMs.TagKeyPrefix
+ tags[tagKeyPrefix+"CloudTestPID"] = fmt.Sprintf("%d", os.Getpid())
+ if !(&tester{
+ Logger: logger,
+ Tags: tags,
+ TagKeyPrefix: tagKeyPrefix,
+ SetID: cloud.InstanceSetID(*instanceSetID),
+ DestroyExisting: *destroyExisting,
+ ProbeInterval: cluster.Containers.CloudVMs.ProbeInterval.Duration(),
+ SyncInterval: cluster.Containers.CloudVMs.SyncInterval.Duration(),
+ TimeoutBooting: cluster.Containers.CloudVMs.TimeoutBooting.Duration(),
+ Driver: driver,
+ DriverParameters: cluster.Containers.CloudVMs.DriverParameters,
+ ImageID: cloud.ImageID(*imageID),
+ InstanceType: it,
+ SSHKey: key,
+ SSHPort: cluster.Containers.CloudVMs.SSHPort,
+ BootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand,
+ ShellCommand: *shellCommand,
+ PauseBeforeDestroy: func() {
+ if *pauseBeforeDestroy {
+ logger.Info("waiting for operator to press Enter")
+ fmt.Fprint(stderr, "Press Enter to continue: ")
+ bufio.NewReader(stdin).ReadString('\n')
+ }
+ },
+ }).Run() {
+ return 1
+ }
+ return 0
+}
+
+func defaultInstanceSetID() string {
+ username := ""
+ if u, err := user.Current(); err == nil {
+ username = u.Username
+ }
+ hostname, _ := os.Hostname()
+ return fmt.Sprintf("cloudtest-%s@%s", username, hostname)
+}
+
+// Return the named instance type, or the cheapest type if name=="".
+func chooseInstanceType(cluster *arvados.Cluster, name string) (arvados.InstanceType, error) {
+ if len(cluster.InstanceTypes) == 0 {
+ return arvados.InstanceType{}, errors.New("no instance types are configured")
+ } else if name == "" {
+ first := true
+ var best arvados.InstanceType
+ for _, it := range cluster.InstanceTypes {
+ if first || best.Price > it.Price {
+ best = it
+ first = false
+ }
+ }
+ return best, nil
+ } else if it, ok := cluster.InstanceTypes[name]; !ok {
+ return it, fmt.Errorf("requested instance type %q is not configured", name)
+ } else {
+ return it, nil
+ }
+}
diff --git a/lib/cloud/cloudtest/tester.go b/lib/cloud/cloudtest/tester.go
new file mode 100644
index 000000000..81b14aed8
--- /dev/null
+++ b/lib/cloud/cloudtest/tester.go
@@ -0,0 +1,352 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package cloudtest
+
+import (
+ "crypto/rand"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "time"
+
+ "git.curoverse.com/arvados.git/lib/cloud"
+ "git.curoverse.com/arvados.git/lib/dispatchcloud/ssh_executor"
+ "git.curoverse.com/arvados.git/lib/dispatchcloud/worker"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "github.com/sirupsen/logrus"
+ "golang.org/x/crypto/ssh"
+)
+
+var (
+ errTestInstanceNotFound = errors.New("test instance missing from cloud provider's list")
+)
+
+// A tester does a sequence of operations to test a cloud driver and
+// configuration. Run() should be called only once, after assigning
+// suitable values to public fields.
+type tester struct {
+ Logger logrus.FieldLogger
+ Tags cloud.SharedResourceTags
+ TagKeyPrefix string
+ SetID cloud.InstanceSetID
+ DestroyExisting bool
+ ProbeInterval time.Duration
+ SyncInterval time.Duration
+ TimeoutBooting time.Duration
+ Driver cloud.Driver
+ DriverParameters json.RawMessage
+ InstanceType arvados.InstanceType
+ ImageID cloud.ImageID
+ SSHKey ssh.Signer
+ SSHPort string
+ BootProbeCommand string
+ ShellCommand string
+ PauseBeforeDestroy func()
+
+ is cloud.InstanceSet
+ testInstance cloud.Instance
+ secret string
+ executor *ssh_executor.Executor
+ showedLoginInfo bool
+
+ failed bool
+}
+
+// Run the test suite as specified, clean up as needed, and return
+// true (everything is OK) or false (something went wrong).
+func (t *tester) Run() bool {
+ // This flag gets set when we encounter a non-fatal error, so
+ // we can continue doing more tests but remember to return
+ // false (failure) at the end.
+ deferredError := false
+
+ var err error
+ t.is, err = t.Driver.InstanceSet(t.DriverParameters, t.SetID, t.Tags, t.Logger)
+ if err != nil {
+ t.Logger.WithError(err).Info("error initializing driver")
+ return false
+ }
+ insts, err := t.getInstances(nil)
+ if err != nil {
+ t.Logger.WithError(err).Info("error getting initial list of instances")
+ return false
+ }
+ for {
+ foundExisting := false
+ for _, i := range insts {
+ if i.Tags()[t.TagKeyPrefix+"InstanceSetID"] != string(t.SetID) {
+ continue
+ }
+ lgr := t.Logger.WithFields(logrus.Fields{
+ "Instance": i.ID(),
+ "InstanceSetID": t.SetID,
+ })
+ foundExisting = true
+ if t.DestroyExisting {
+ lgr.Info("destroying existing instance with our InstanceSetID")
+ err := i.Destroy()
+ if err != nil {
+ lgr.WithError(err).Error("error destroying existing instance")
+ } else {
+ lgr.Info("Destroy() call succeeded")
+ }
+ } else {
+ lgr.Error("found existing instance with our InstanceSetID")
+ }
+ }
+ if !foundExisting {
+ break
+ } else if t.DestroyExisting {
+ t.sleepSyncInterval()
+ } else {
+ t.Logger.Error("cannot continue with existing instances -- clean up manually, use -destroy-existing=true, or choose a different -instance-set-id")
+ return false
+ }
+ }
+
+ t.secret = randomHex(40)
+
+ tags := cloud.InstanceTags{}
+ for k, v := range t.Tags {
+ tags[k] = v
+ }
+ tags[t.TagKeyPrefix+"InstanceSetID"] = string(t.SetID)
+ tags[t.TagKeyPrefix+"InstanceSecret"] = t.secret
+
+ defer t.destroyTestInstance()
+
+ bootDeadline := time.Now().Add(t.TimeoutBooting)
+ initCommand := worker.TagVerifier{nil, t.secret}.InitCommand()
+
+ t.Logger.WithFields(logrus.Fields{
+ "InstanceType": t.InstanceType.Name,
+ "ProviderInstanceType": t.InstanceType.ProviderType,
+ "ImageID": t.ImageID,
+ "Tags": tags,
+ "InitCommand": initCommand,
+ }).Info("creating instance")
+ inst, err := t.is.Create(t.InstanceType, t.ImageID, tags, initCommand, t.SSHKey.PublicKey())
+ if err != nil {
+ deferredError = true
+ t.Logger.WithError(err).Error("error creating test instance")
+ t.Logger.WithField("Deadline", bootDeadline).Info("waiting for instance to appear anyway, in case the Create response was incorrect")
+ for err = t.refreshTestInstance(); err != nil; err = t.refreshTestInstance() {
+ if time.Now().After(bootDeadline) {
+ t.Logger.Error("timed out")
+ return false
+ } else {
+ t.sleepSyncInterval()
+ }
+ }
+ t.Logger.WithField("Instance", t.testInstance.ID()).Info("new instance appeared")
+ t.showLoginInfo()
+ } else {
+ t.Logger.WithField("Instance", inst.ID()).Info("created instance")
+ t.testInstance = inst
+ t.showLoginInfo()
+ err = t.refreshTestInstance()
+ if err == errTestInstanceNotFound {
+ t.Logger.WithError(err).Error("cloud/driver Create succeeded, but instance is not in list")
+ deferredError = true
+ } else if err != nil {
+ t.Logger.WithError(err).Error("error getting list of instances")
+ return false
+ }
+ }
+ t.testInstance = worker.TagVerifier{t.testInstance, t.secret}
+
+ if !t.checkTags() {
+ // checkTags() already logged the errors
+ return false
+ }
+
+ if !t.waitForBoot(bootDeadline) {
+ return false
+ }
+
+ if t.ShellCommand != "" {
+ err = t.runShellCommand(t.ShellCommand)
+ if err != nil {
+ t.Logger.WithError(err).Error("shell command failed")
+ deferredError = true
+ }
+ }
+
+ if fn := t.PauseBeforeDestroy; fn != nil {
+ t.showLoginInfo()
+ fn()
+ }
+
+ return !deferredError
+}
+
+func (t *tester) showLoginInfo() {
+ t.updateExecutor()
+ host, port := t.executor.TargetHostPort()
+ if host == "" {
+ return
+ }
+ user := t.testInstance.RemoteUser()
+ t.Logger.WithField("Command", fmt.Sprintf("ssh -p%s %s@%s", port, user, host)).Info("showing login information")
+ t.showedLoginInfo = true
+}
+
+// Get the latest instance list from the driver. If our test instance
+// is found, assign it to t.testIntance.
+func (t *tester) refreshTestInstance() error {
+ insts, err := t.getInstances(cloud.InstanceTags{t.TagKeyPrefix + "InstanceSetID": string(t.SetID)})
+ if err != nil {
+ return err
+ }
+ for _, i := range insts {
+ if t.testInstance == nil {
+ // Filter by InstanceSetID tag value
+ if i.Tags()[t.TagKeyPrefix+"InstanceSetID"] != string(t.SetID) {
+ continue
+ }
+ } else {
+ // Filter by instance ID
+ if i.ID() != t.testInstance.ID() {
+ continue
+ }
+ }
+ t.Logger.WithFields(logrus.Fields{
+ "Instance": i.ID(),
+ "Address": i.Address(),
+ }).Info("found our instance in returned list")
+ t.testInstance = worker.TagVerifier{i, t.secret}
+ if !t.showedLoginInfo {
+ t.showLoginInfo()
+ }
+ return nil
+ }
+ return errTestInstanceNotFound
+}
+
+func (t *tester) getInstances(tags cloud.InstanceTags) ([]cloud.Instance, error) {
+ var ret []cloud.Instance
+ t.Logger.WithField("FilterTags", tags).Info("getting instance list")
+ insts, err := t.is.Instances(tags)
+ if err != nil {
+ return nil, err
+ }
+ t.Logger.WithField("N", len(insts)).Info("got instance list")
+ for _, i := range insts {
+ if i.Tags()[t.TagKeyPrefix+"InstanceSetID"] == string(t.SetID) {
+ ret = append(ret, i)
+ }
+ }
+ return ret, nil
+}
+
+func (t *tester) checkTags() bool {
+ ok := true
+ for k, v := range t.Tags {
+ if got := t.testInstance.Tags()[k]; got != v {
+ ok = false
+ t.Logger.WithFields(logrus.Fields{
+ "Key": k,
+ "ExpectedValue": v,
+ "GotValue": got,
+ }).Error("tag is missing from test instance")
+ }
+ }
+ if ok {
+ t.Logger.Info("all expected tags are present")
+ }
+ return ok
+}
+
+func (t *tester) waitForBoot(deadline time.Time) bool {
+ for time.Now().Before(deadline) {
+ err := t.runShellCommand(t.BootProbeCommand)
+ if err == nil {
+ return true
+ }
+ t.sleepProbeInterval()
+ t.refreshTestInstance()
+ }
+ t.Logger.Error("timed out")
+ return false
+}
+
+func (t *tester) updateExecutor() {
+ if t.executor == nil {
+ t.executor = ssh_executor.New(t.testInstance)
+ t.executor.SetTargetPort(t.SSHPort)
+ t.executor.SetSigners(t.SSHKey)
+ } else {
+ t.executor.SetTarget(t.testInstance)
+ }
+}
+
+func (t *tester) runShellCommand(cmd string) error {
+ t.updateExecutor()
+ t.Logger.WithFields(logrus.Fields{
+ "Command": cmd,
+ }).Info("executing remote command")
+ stdout, stderr, err := t.executor.Execute(nil, cmd, nil)
+ lgr := t.Logger.WithFields(logrus.Fields{
+ "Command": cmd,
+ "stdout": string(stdout),
+ "stderr": string(stderr),
+ })
+ if err != nil {
+ lgr.WithError(err).Info("remote command failed")
+ } else {
+ lgr.Info("remote command succeeded")
+ }
+ return err
+}
+
+// currently, this tries forever until it can return true (success).
+func (t *tester) destroyTestInstance() bool {
+ if t.testInstance == nil {
+ return true
+ }
+ for {
+ t.Logger.WithField("Instance", t.testInstance.ID()).Info("destroying instance")
+ err := t.testInstance.Destroy()
+ if err != nil {
+ t.Logger.WithError(err).WithField("Instance", t.testInstance.ID()).Error("error destroying instance")
+ } else {
+ t.Logger.WithField("Instance", t.testInstance.ID()).Info("destroyed instance")
+ }
+ err = t.refreshTestInstance()
+ if err == errTestInstanceNotFound {
+ t.Logger.WithField("Instance", t.testInstance.ID()).Info("instance no longer appears in list")
+ t.testInstance = nil
+ return true
+ } else if err == nil {
+ t.Logger.WithField("Instance", t.testInstance.ID()).Info("instance still exists after calling Destroy")
+ t.sleepSyncInterval()
+ continue
+ } else {
+ t.Logger.WithError(err).Error("error getting list of instances")
+ continue
+ }
+ }
+}
+
+func (t *tester) sleepSyncInterval() {
+ t.Logger.WithField("Duration", t.SyncInterval).Info("waiting SyncInterval")
+ time.Sleep(t.SyncInterval)
+}
+
+func (t *tester) sleepProbeInterval() {
+ t.Logger.WithField("Duration", t.ProbeInterval).Info("waiting ProbeInterval")
+ time.Sleep(t.ProbeInterval)
+}
+
+// Return a random string of n hexadecimal digits (n*4 random bits). n
+// must be even.
+func randomHex(n int) string {
+ buf := make([]byte, n/2)
+ _, err := rand.Read(buf)
+ if err != nil {
+ panic(err)
+ }
+ return fmt.Sprintf("%x", buf)
+}
diff --git a/lib/cloud/cloudtest/tester_test.go b/lib/cloud/cloudtest/tester_test.go
new file mode 100644
index 000000000..358530bf8
--- /dev/null
+++ b/lib/cloud/cloudtest/tester_test.go
@@ -0,0 +1,97 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package cloudtest
+
+import (
+ "bytes"
+ "testing"
+ "time"
+
+ "git.curoverse.com/arvados.git/lib/cloud"
+ "git.curoverse.com/arvados.git/lib/dispatchcloud/test"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
+ "golang.org/x/crypto/ssh"
+ check "gopkg.in/check.v1"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+ check.TestingT(t)
+}
+
+var _ = check.Suite(&TesterSuite{})
+
+type TesterSuite struct {
+ stubDriver *test.StubDriver
+ cluster *arvados.Cluster
+ tester *tester
+ log bytes.Buffer
+}
+
+func (s *TesterSuite) SetUpTest(c *check.C) {
+ pubkey, privkey := test.LoadTestKey(c, "../../dispatchcloud/test/sshkey_dispatch")
+ _, privhostkey := test.LoadTestKey(c, "../../dispatchcloud/test/sshkey_vm")
+ s.stubDriver = &test.StubDriver{
+ HostKey: privhostkey,
+ AuthorizedKeys: []ssh.PublicKey{pubkey},
+ ErrorRateDestroy: 0.1,
+ MinTimeBetweenCreateCalls: time.Millisecond,
+ }
+ tagKeyPrefix := "tagprefix:"
+ s.cluster = &arvados.Cluster{
+ ManagementToken: "test-management-token",
+ Containers: arvados.ContainersConfig{
+ CloudVMs: arvados.CloudVMsConfig{
+ SyncInterval: arvados.Duration(10 * time.Millisecond),
+ TimeoutBooting: arvados.Duration(150 * time.Millisecond),
+ TimeoutProbe: arvados.Duration(15 * time.Millisecond),
+ ProbeInterval: arvados.Duration(5 * time.Millisecond),
+ ResourceTags: map[string]string{"testtag": "test value"},
+ },
+ },
+ InstanceTypes: arvados.InstanceTypeMap{
+ test.InstanceType(1).Name: test.InstanceType(1),
+ test.InstanceType(2).Name: test.InstanceType(2),
+ test.InstanceType(3).Name: test.InstanceType(3),
+ },
+ }
+ s.tester = &tester{
+ Logger: ctxlog.New(&s.log, "text", "info"),
+ Tags: cloud.SharedResourceTags{"testtagkey": "testtagvalue"},
+ TagKeyPrefix: tagKeyPrefix,
+ SetID: cloud.InstanceSetID("test-instance-set-id"),
+ ProbeInterval: 5 * time.Millisecond,
+ SyncInterval: 10 * time.Millisecond,
+ TimeoutBooting: 150 * time.Millisecond,
+ Driver: s.stubDriver,
+ DriverParameters: nil,
+ InstanceType: test.InstanceType(2),
+ ImageID: "test-image-id",
+ SSHKey: privkey,
+ BootProbeCommand: "crunch-run --list",
+ ShellCommand: "true",
+ }
+}
+
+func (s *TesterSuite) TestSuccess(c *check.C) {
+ s.tester.Logger = ctxlog.TestLogger(c)
+ ok := s.tester.Run()
+ c.Check(ok, check.Equals, true)
+}
+
+func (s *TesterSuite) TestBootFail(c *check.C) {
+ s.tester.BootProbeCommand = "falsey"
+ ok := s.tester.Run()
+ c.Check(ok, check.Equals, false)
+ c.Check(s.log.String(), check.Matches, `(?ms).*\\"falsey\\": command not found.*`)
+}
+
+func (s *TesterSuite) TestShellCommandFail(c *check.C) {
+ s.tester.ShellCommand = "falsey"
+ ok := s.tester.Run()
+ c.Check(ok, check.Equals, false)
+ c.Check(s.log.String(), check.Matches, `(?ms).*\\"falsey\\": command not found.*`)
+}
diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml
index dc128e56b..bad35a120 100644
--- a/lib/config/config.default.yml
+++ b/lib/config/config.default.yml
@@ -486,7 +486,7 @@ Clusters:
# Shell command to execute on each worker to determine whether
# the worker is booted and ready to run containers. It should
# exit zero if the worker is ready.
- BootProbeCommand: "docker ps"
+ BootProbeCommand: "docker ps -q"
# Minimum interval between consecutive probes to a single
# worker.
diff --git a/lib/config/generated_config.go b/lib/config/generated_config.go
index 98cd343bd..9ac519071 100644
--- a/lib/config/generated_config.go
+++ b/lib/config/generated_config.go
@@ -492,7 +492,7 @@ Clusters:
# Shell command to execute on each worker to determine whether
# the worker is booted and ready to run containers. It should
# exit zero if the worker is ready.
- BootProbeCommand: "docker ps"
+ BootProbeCommand: "docker ps -q"
# Minimum interval between consecutive probes to a single
# worker.
diff --git a/lib/dispatchcloud/dispatcher_test.go b/lib/dispatchcloud/dispatcher_test.go
index 6b73e71cc..fbe04393a 100644
--- a/lib/dispatchcloud/dispatcher_test.go
+++ b/lib/dispatchcloud/dispatcher_test.go
@@ -107,7 +107,7 @@ func (s *DispatcherSuite) TearDownTest(c *check.C) {
// a fake queue and cloud driver. The fake cloud driver injects
// artificial errors in order to exercise a variety of code paths.
func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
- drivers["test"] = s.stubDriver
+ Drivers["test"] = s.stubDriver
s.disp.setupOnce.Do(s.disp.initialize)
queue := &test.Queue{
ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) {
@@ -210,7 +210,7 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
func (s *DispatcherSuite) TestAPIPermissions(c *check.C) {
s.cluster.ManagementToken = "abcdefgh"
- drivers["test"] = s.stubDriver
+ Drivers["test"] = s.stubDriver
s.disp.setupOnce.Do(s.disp.initialize)
s.disp.queue = &test.Queue{}
go s.disp.run()
@@ -232,7 +232,7 @@ func (s *DispatcherSuite) TestAPIPermissions(c *check.C) {
func (s *DispatcherSuite) TestAPIDisabled(c *check.C) {
s.cluster.ManagementToken = ""
- drivers["test"] = s.stubDriver
+ Drivers["test"] = s.stubDriver
s.disp.setupOnce.Do(s.disp.initialize)
s.disp.queue = &test.Queue{}
go s.disp.run()
@@ -251,7 +251,7 @@ func (s *DispatcherSuite) TestAPIDisabled(c *check.C) {
func (s *DispatcherSuite) TestInstancesAPI(c *check.C) {
s.cluster.ManagementToken = "abcdefgh"
s.cluster.Containers.CloudVMs.TimeoutBooting = arvados.Duration(time.Second)
- drivers["test"] = s.stubDriver
+ Drivers["test"] = s.stubDriver
s.disp.setupOnce.Do(s.disp.initialize)
s.disp.queue = &test.Queue{}
go s.disp.run()
diff --git a/lib/dispatchcloud/driver.go b/lib/dispatchcloud/driver.go
index a8f3d5b5e..49437e318 100644
--- a/lib/dispatchcloud/driver.go
+++ b/lib/dispatchcloud/driver.go
@@ -17,13 +17,13 @@ import (
"golang.org/x/crypto/ssh"
)
-var drivers = map[string]cloud.Driver{
+var Drivers = map[string]cloud.Driver{
"azure": azure.Driver,
"ec2": ec2.Driver,
}
func newInstanceSet(cluster *arvados.Cluster, setID cloud.InstanceSetID, logger logrus.FieldLogger, reg *prometheus.Registry) (cloud.InstanceSet, error) {
- driver, ok := drivers[cluster.Containers.CloudVMs.Driver]
+ driver, ok := Drivers[cluster.Containers.CloudVMs.Driver]
if !ok {
return nil, fmt.Errorf("unsupported cloud driver %q", cluster.Containers.CloudVMs.Driver)
}
@@ -85,7 +85,7 @@ func (is defaultTaggingInstanceSet) Create(it arvados.InstanceType, image cloud.
return is.InstanceSet.Create(it, image, allTags, init, pk)
}
-// Filters the instances returned by the wrapped InstanceSet's
+// Filter the instances returned by the wrapped InstanceSet's
// Instances() method (in case the wrapped InstanceSet didn't do this
// itself).
type filteringInstanceSet struct {
diff --git a/lib/dispatchcloud/ssh_executor/executor.go b/lib/dispatchcloud/ssh_executor/executor.go
index feed1c2a7..d608763cf 100644
--- a/lib/dispatchcloud/ssh_executor/executor.go
+++ b/lib/dispatchcloud/ssh_executor/executor.go
@@ -173,14 +173,13 @@ func (exr *Executor) sshClient(create bool) (*ssh.Client, error) {
return exr.client, exr.clientErr
}
-// Create a new SSH client.
-func (exr *Executor) setupSSHClient() (*ssh.Client, error) {
- target := exr.Target()
- addr := target.Address()
+func (exr *Executor) TargetHostPort() (string, string) {
+ addr := exr.Target().Address()
if addr == "" {
- return nil, errors.New("instance has no address")
+ return "", ""
}
- if h, p, err := net.SplitHostPort(addr); err != nil || p == "" {
+ h, p, err := net.SplitHostPort(addr)
+ if err != nil || p == "" {
// Target address does not specify a port. Use
// targetPort, or "ssh".
if h == "" {
@@ -189,11 +188,19 @@ func (exr *Executor) setupSSHClient() (*ssh.Client, error) {
if p = exr.targetPort; p == "" {
p = "ssh"
}
- addr = net.JoinHostPort(h, p)
+ }
+ return h, p
+}
+
+// Create a new SSH client.
+func (exr *Executor) setupSSHClient() (*ssh.Client, error) {
+ addr := net.JoinHostPort(exr.TargetHostPort())
+ if addr == ":" {
+ return nil, errors.New("instance has no address")
}
var receivedKey ssh.PublicKey
client, err := ssh.Dial("tcp", addr, &ssh.ClientConfig{
- User: target.RemoteUser(),
+ User: exr.Target().RemoteUser(),
Auth: []ssh.AuthMethod{
ssh.PublicKeys(exr.signers...),
},
@@ -210,7 +217,7 @@ func (exr *Executor) setupSSHClient() (*ssh.Client, error) {
}
if exr.hostKey == nil || !bytes.Equal(exr.hostKey.Marshal(), receivedKey.Marshal()) {
- err = target.VerifyHostKey(receivedKey, client)
+ err = exr.Target().VerifyHostKey(receivedKey, client)
if err != nil {
return nil, err
}
diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go
index 201e8aad2..e00a8683a 100644
--- a/lib/dispatchcloud/worker/pool.go
+++ b/lib/dispatchcloud/worker/pool.go
@@ -292,7 +292,7 @@ func (wp *Pool) Create(it arvados.InstanceType) bool {
wp.tagKeyPrefix + tagKeyIdleBehavior: string(IdleBehaviorRun),
wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
}
- initCmd := cloud.InitCommand(fmt.Sprintf("umask 0177 && echo -n %q >%s", secret, instanceSecretFilename))
+ initCmd := TagVerifier{nil, secret}.InitCommand()
inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
wp.mtx.Lock()
defer wp.mtx.Unlock()
@@ -346,7 +346,7 @@ func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior)
// Caller must have lock.
func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
- inst = tagVerifier{inst, secret}
+ inst = TagVerifier{inst, secret}
id := inst.ID()
if wkr := wp.workers[id]; wkr != nil {
wkr.executor.SetTarget(inst)
diff --git a/lib/dispatchcloud/worker/verify.go b/lib/dispatchcloud/worker/verify.go
index 330071951..c71870210 100644
--- a/lib/dispatchcloud/worker/verify.go
+++ b/lib/dispatchcloud/worker/verify.go
@@ -21,13 +21,17 @@ var (
instanceSecretLength = 40 // hex digits
)
-type tagVerifier struct {
+type TagVerifier struct {
cloud.Instance
- secret string
+ Secret string
}
-func (tv tagVerifier) VerifyHostKey(pubKey ssh.PublicKey, client *ssh.Client) error {
- if err := tv.Instance.VerifyHostKey(pubKey, client); err != cloud.ErrNotImplemented || tv.secret == "" {
+func (tv TagVerifier) InitCommand() cloud.InitCommand {
+ return cloud.InitCommand(fmt.Sprintf("umask 0177 && echo -n %q >%s", tv.Secret, instanceSecretFilename))
+}
+
+func (tv TagVerifier) VerifyHostKey(pubKey ssh.PublicKey, client *ssh.Client) error {
+ if err := tv.Instance.VerifyHostKey(pubKey, client); err != cloud.ErrNotImplemented || tv.Secret == "" {
// If the wrapped instance indicates it has a way to
// verify the key, return that decision.
return err
@@ -49,7 +53,7 @@ func (tv tagVerifier) VerifyHostKey(pubKey ssh.PublicKey, client *ssh.Client) er
if err != nil {
return err
}
- if stdout.String() != tv.secret {
+ if stdout.String() != tv.Secret {
return errBadInstanceSecret
}
return nil
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list