[ARVADOS] created: 1.3.0-1124-g0684f84ec

Git user git at public.curoverse.com
Fri Jun 21 20:05:20 UTC 2019


        at  0684f84ec50069b24b4564474db67d6b44266699 (commit)


commit 0684f84ec50069b24b4564474db67d6b44266699
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Fri Jun 21 16:01:38 2019 -0400

    15026: Add comments.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/lib/cloud/cloudtest/tester.go b/lib/cloud/cloudtest/tester.go
index 81b14aed8..33ff264bc 100644
--- a/lib/cloud/cloudtest/tester.go
+++ b/lib/cloud/cloudtest/tester.go
@@ -68,11 +68,17 @@ func (t *tester) Run() bool {
 		t.Logger.WithError(err).Info("error initializing driver")
 		return false
 	}
+
+	// Don't send the driver any filters the first time we get the
+	// instance list. This way we can log an instance count
+	// (N=...)  that includes all instances in this service
+	// account, even if they don't have the same InstanceSetID.
 	insts, err := t.getInstances(nil)
 	if err != nil {
 		t.Logger.WithError(err).Info("error getting initial list of instances")
 		return false
 	}
+
 	for {
 		foundExisting := false
 		for _, i := range insts {
@@ -129,6 +135,9 @@ func (t *tester) Run() bool {
 	}).Info("creating instance")
 	inst, err := t.is.Create(t.InstanceType, t.ImageID, tags, initCommand, t.SSHKey.PublicKey())
 	if err != nil {
+		// Create() might have failed due to a bug or network
+		// error even though the creation was successful, so
+		// it's safer to wait a bit for an instance to appear.
 		deferredError = true
 		t.Logger.WithError(err).Error("error creating test instance")
 		t.Logger.WithField("Deadline", bootDeadline).Info("waiting for instance to appear anyway, in case the Create response was incorrect")
@@ -143,6 +152,8 @@ func (t *tester) Run() bool {
 		t.Logger.WithField("Instance", t.testInstance.ID()).Info("new instance appeared")
 		t.showLoginInfo()
 	} else {
+		// Create() succeeded. Make sure the new instance
+		// appears right away in the Instances() list.
 		t.Logger.WithField("Instance", inst.ID()).Info("created instance")
 		t.testInstance = inst
 		t.showLoginInfo()
@@ -182,6 +193,11 @@ func (t *tester) Run() bool {
 	return !deferredError
 }
 
+// If the test instance has an address, log an "ssh user at host" command
+// line that the operator can paste into another terminal, and set
+// t.showedLoginInfo.
+//
+// If the test instance doesn't have an address yet, do nothing.
 func (t *tester) showLoginInfo() {
 	t.updateExecutor()
 	host, port := t.executor.TargetHostPort()
@@ -225,6 +241,10 @@ func (t *tester) refreshTestInstance() error {
 	return errTestInstanceNotFound
 }
 
+// Get the list of instances, passing the given tags to the cloud
+// driver to filter results.
+//
+// Return only the instances that have our InstanceSetID tag.
 func (t *tester) getInstances(tags cloud.InstanceTags) ([]cloud.Instance, error) {
 	var ret []cloud.Instance
 	t.Logger.WithField("FilterTags", tags).Info("getting instance list")
@@ -241,6 +261,8 @@ func (t *tester) getInstances(tags cloud.InstanceTags) ([]cloud.Instance, error)
 	return ret, nil
 }
 
+// Check that t.testInstance has every tag in t.Tags. If not, log an
+// error and return false.
 func (t *tester) checkTags() bool {
 	ok := true
 	for k, v := range t.Tags {
@@ -259,6 +281,8 @@ func (t *tester) checkTags() bool {
 	return ok
 }
 
+// Run t.BootProbeCommand on t.testInstance until it succeeds or the
+// deadline arrives.
 func (t *tester) waitForBoot(deadline time.Time) bool {
 	for time.Now().Before(deadline) {
 		err := t.runShellCommand(t.BootProbeCommand)
@@ -272,6 +296,8 @@ func (t *tester) waitForBoot(deadline time.Time) bool {
 	return false
 }
 
+// Create t.executor and/or update its target to t.testInstance's
+// current address.
 func (t *tester) updateExecutor() {
 	if t.executor == nil {
 		t.executor = ssh_executor.New(t.testInstance)
diff --git a/lib/dispatchcloud/driver.go b/lib/dispatchcloud/driver.go
index 49437e318..8d09d6a53 100644
--- a/lib/dispatchcloud/driver.go
+++ b/lib/dispatchcloud/driver.go
@@ -17,6 +17,9 @@ import (
 	"golang.org/x/crypto/ssh"
 )
 
+// Map of available cloud drivers.
+// Clusters.*.Containers.CloudVMs.Driver configuration values
+// correspond to keys in this map.
 var Drivers = map[string]cloud.Driver{
 	"azure": azure.Driver,
 	"ec2":   ec2.Driver,

commit 6d87ae37bbc00cbb6fffb47ade3ec824b501b3df
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Fri Jun 21 14:09:20 2019 -0400

    15026: Add cloudtest command.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/build/run-tests.sh b/build/run-tests.sh
index 97c21b405..6f8f11774 100755
--- a/build/run-tests.sh
+++ b/build/run-tests.sh
@@ -80,6 +80,7 @@ lib/controller
 lib/crunchstat
 lib/cloud
 lib/cloud/azure
+lib/cloud/cloudtest
 lib/dispatchcloud
 lib/dispatchcloud/container
 lib/dispatchcloud/scheduler
@@ -734,7 +735,7 @@ do_test() {
         services/api)
             stop_services
             ;;
-        gofmt | govendor | doc | lib/cli | lib/cloud/azure | lib/cloud/ec2 | lib/cmd | lib/dispatchcloud/ssh_executor | lib/dispatchcloud/worker)
+        gofmt | govendor | doc | lib/cli | lib/cloud/azure | lib/cloud/ec2 | lib/cloud/cloudtest | lib/cmd | lib/dispatchcloud/ssh_executor | lib/dispatchcloud/worker)
             # don't care whether services are running
             ;;
         *)
@@ -992,6 +993,7 @@ gostuff=(
     lib/cloud
     lib/cloud/azure
     lib/cloud/ec2
+    lib/cloud/cloudtest
     lib/config
     lib/dispatchcloud
     lib/dispatchcloud/container
diff --git a/cmd/arvados-server/cmd.go b/cmd/arvados-server/cmd.go
index 983159382..2506bd2c9 100644
--- a/cmd/arvados-server/cmd.go
+++ b/cmd/arvados-server/cmd.go
@@ -7,6 +7,7 @@ package main
 import (
 	"os"
 
+	"git.curoverse.com/arvados.git/lib/cloud/cloudtest"
 	"git.curoverse.com/arvados.git/lib/cmd"
 	"git.curoverse.com/arvados.git/lib/config"
 	"git.curoverse.com/arvados.git/lib/controller"
@@ -20,6 +21,7 @@ var (
 		"-version":  cmd.Version(version),
 		"--version": cmd.Version(version),
 
+		"cloudtest":      cloudtest.Command,
 		"config-check":   config.CheckCommand,
 		"config-dump":    config.DumpCommand,
 		"controller":     controller.Command,
diff --git a/lib/cloud/cloudtest/cmd.go b/lib/cloud/cloudtest/cmd.go
new file mode 100644
index 000000000..1f94ea6da
--- /dev/null
+++ b/lib/cloud/cloudtest/cmd.go
@@ -0,0 +1,153 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package cloudtest
+
+import (
+	"bufio"
+	"errors"
+	"flag"
+	"fmt"
+	"io"
+	"os"
+	"os/user"
+
+	"git.curoverse.com/arvados.git/lib/cloud"
+	"git.curoverse.com/arvados.git/lib/config"
+	"git.curoverse.com/arvados.git/lib/dispatchcloud"
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+	"git.curoverse.com/arvados.git/sdk/go/ctxlog"
+	"golang.org/x/crypto/ssh"
+)
+
+var Command command
+
+type command struct{}
+
+func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
+	var err error
+	defer func() {
+		if err != nil {
+			fmt.Fprintf(stderr, "%s\n", err)
+		}
+	}()
+
+	flags := flag.NewFlagSet("", flag.ContinueOnError)
+	flags.SetOutput(stderr)
+	configFile := flags.String("config", arvados.DefaultConfigFile, "Site configuration `file`")
+	instanceSetID := flags.String("instance-set-id", defaultInstanceSetID(), "InstanceSetID tag `value` to use on the test instance")
+	imageID := flags.String("image-id", "", "Image ID to use when creating the test instance (if empty, use cluster config)")
+	instanceType := flags.String("instance-type", "", "Instance type to create (if empty, use cheapest type in config)")
+	destroyExisting := flags.Bool("destroy-existing", false, "Destroy any existing instances tagged with our InstanceSetID, instead of erroring out")
+	shellCommand := flags.String("command", "", "Run an interactive shell command on the test instance when it boots")
+	pauseBeforeDestroy := flags.Bool("pause-before-destroy", false, "Prompt and wait before destroying the test instance")
+	err = flags.Parse(args)
+	if err == flag.ErrHelp {
+		err = nil
+		return 0
+	} else if err != nil {
+		return 2
+	}
+
+	if len(flags.Args()) != 0 {
+		flags.Usage()
+		return 2
+	}
+	logger := ctxlog.New(stderr, "text", "info")
+	defer func() {
+		if err != nil {
+			logger.WithError(err).Error("fatal")
+			// suppress output from the other error-printing func
+			err = nil
+		}
+		logger.Info("exiting")
+	}()
+
+	cfg, err := config.LoadFile(*configFile, logger)
+	if err != nil {
+		return 1
+	}
+	cluster, err := cfg.GetCluster("")
+	if err != nil {
+		return 1
+	}
+	key, err := ssh.ParsePrivateKey([]byte(cluster.Containers.DispatchPrivateKey))
+	if err != nil {
+		err = fmt.Errorf("error parsing configured Containers.DispatchPrivateKey: %s", err)
+		return 1
+	}
+	driver, ok := dispatchcloud.Drivers[cluster.Containers.CloudVMs.Driver]
+	if !ok {
+		err = fmt.Errorf("unsupported cloud driver %q", cluster.Containers.CloudVMs.Driver)
+		return 1
+	}
+	if *imageID == "" {
+		*imageID = cluster.Containers.CloudVMs.ImageID
+	}
+	it, err := chooseInstanceType(cluster, *instanceType)
+	if err != nil {
+		return 1
+	}
+	tags := cloud.SharedResourceTags(cluster.Containers.CloudVMs.ResourceTags)
+	tagKeyPrefix := cluster.Containers.CloudVMs.TagKeyPrefix
+	tags[tagKeyPrefix+"CloudTestPID"] = fmt.Sprintf("%d", os.Getpid())
+	if !(&tester{
+		Logger:           logger,
+		Tags:             tags,
+		TagKeyPrefix:     tagKeyPrefix,
+		SetID:            cloud.InstanceSetID(*instanceSetID),
+		DestroyExisting:  *destroyExisting,
+		ProbeInterval:    cluster.Containers.CloudVMs.ProbeInterval.Duration(),
+		SyncInterval:     cluster.Containers.CloudVMs.SyncInterval.Duration(),
+		TimeoutBooting:   cluster.Containers.CloudVMs.TimeoutBooting.Duration(),
+		Driver:           driver,
+		DriverParameters: cluster.Containers.CloudVMs.DriverParameters,
+		ImageID:          cloud.ImageID(*imageID),
+		InstanceType:     it,
+		SSHKey:           key,
+		SSHPort:          cluster.Containers.CloudVMs.SSHPort,
+		BootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand,
+		ShellCommand:     *shellCommand,
+		PauseBeforeDestroy: func() {
+			if *pauseBeforeDestroy {
+				logger.Info("waiting for operator to press Enter")
+				fmt.Fprint(stderr, "Press Enter to continue: ")
+				bufio.NewReader(stdin).ReadString('\n')
+			}
+		},
+	}).Run() {
+		return 1
+	}
+	return 0
+}
+
+func defaultInstanceSetID() string {
+	username := ""
+	if u, err := user.Current(); err == nil {
+		username = u.Username
+	}
+	hostname, _ := os.Hostname()
+	return fmt.Sprintf("cloudtest-%s@%s", username, hostname)
+}
+
+// Return the named instance type, or the cheapest type if name=="".
+func chooseInstanceType(cluster *arvados.Cluster, name string) (arvados.InstanceType, error) {
+	if len(cluster.InstanceTypes) == 0 {
+		return arvados.InstanceType{}, errors.New("no instance types are configured")
+	} else if name == "" {
+		first := true
+		var best arvados.InstanceType
+		for _, it := range cluster.InstanceTypes {
+			if first || best.Price > it.Price {
+				best = it
+				first = false
+			}
+		}
+		return best, nil
+	} else if it, ok := cluster.InstanceTypes[name]; !ok {
+		return it, fmt.Errorf("requested instance type %q is not configured", name)
+	} else {
+		return it, nil
+	}
+}
diff --git a/lib/cloud/cloudtest/tester.go b/lib/cloud/cloudtest/tester.go
new file mode 100644
index 000000000..81b14aed8
--- /dev/null
+++ b/lib/cloud/cloudtest/tester.go
@@ -0,0 +1,352 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package cloudtest
+
+import (
+	"crypto/rand"
+	"encoding/json"
+	"errors"
+	"fmt"
+	"time"
+
+	"git.curoverse.com/arvados.git/lib/cloud"
+	"git.curoverse.com/arvados.git/lib/dispatchcloud/ssh_executor"
+	"git.curoverse.com/arvados.git/lib/dispatchcloud/worker"
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+	"github.com/sirupsen/logrus"
+	"golang.org/x/crypto/ssh"
+)
+
+var (
+	errTestInstanceNotFound = errors.New("test instance missing from cloud provider's list")
+)
+
+// A tester does a sequence of operations to test a cloud driver and
+// configuration. Run() should be called only once, after assigning
+// suitable values to public fields.
+type tester struct {
+	Logger             logrus.FieldLogger
+	Tags               cloud.SharedResourceTags
+	TagKeyPrefix       string
+	SetID              cloud.InstanceSetID
+	DestroyExisting    bool
+	ProbeInterval      time.Duration
+	SyncInterval       time.Duration
+	TimeoutBooting     time.Duration
+	Driver             cloud.Driver
+	DriverParameters   json.RawMessage
+	InstanceType       arvados.InstanceType
+	ImageID            cloud.ImageID
+	SSHKey             ssh.Signer
+	SSHPort            string
+	BootProbeCommand   string
+	ShellCommand       string
+	PauseBeforeDestroy func()
+
+	is              cloud.InstanceSet
+	testInstance    cloud.Instance
+	secret          string
+	executor        *ssh_executor.Executor
+	showedLoginInfo bool
+
+	failed bool
+}
+
+// Run the test suite as specified, clean up as needed, and return
+// true (everything is OK) or false (something went wrong).
+func (t *tester) Run() bool {
+	// This flag gets set when we encounter a non-fatal error, so
+	// we can continue doing more tests but remember to return
+	// false (failure) at the end.
+	deferredError := false
+
+	var err error
+	t.is, err = t.Driver.InstanceSet(t.DriverParameters, t.SetID, t.Tags, t.Logger)
+	if err != nil {
+		t.Logger.WithError(err).Info("error initializing driver")
+		return false
+	}
+	insts, err := t.getInstances(nil)
+	if err != nil {
+		t.Logger.WithError(err).Info("error getting initial list of instances")
+		return false
+	}
+	for {
+		foundExisting := false
+		for _, i := range insts {
+			if i.Tags()[t.TagKeyPrefix+"InstanceSetID"] != string(t.SetID) {
+				continue
+			}
+			lgr := t.Logger.WithFields(logrus.Fields{
+				"Instance":      i.ID(),
+				"InstanceSetID": t.SetID,
+			})
+			foundExisting = true
+			if t.DestroyExisting {
+				lgr.Info("destroying existing instance with our InstanceSetID")
+				err := i.Destroy()
+				if err != nil {
+					lgr.WithError(err).Error("error destroying existing instance")
+				} else {
+					lgr.Info("Destroy() call succeeded")
+				}
+			} else {
+				lgr.Error("found existing instance with our InstanceSetID")
+			}
+		}
+		if !foundExisting {
+			break
+		} else if t.DestroyExisting {
+			t.sleepSyncInterval()
+		} else {
+			t.Logger.Error("cannot continue with existing instances -- clean up manually, use -destroy-existing=true, or choose a different -instance-set-id")
+			return false
+		}
+	}
+
+	t.secret = randomHex(40)
+
+	tags := cloud.InstanceTags{}
+	for k, v := range t.Tags {
+		tags[k] = v
+	}
+	tags[t.TagKeyPrefix+"InstanceSetID"] = string(t.SetID)
+	tags[t.TagKeyPrefix+"InstanceSecret"] = t.secret
+
+	defer t.destroyTestInstance()
+
+	bootDeadline := time.Now().Add(t.TimeoutBooting)
+	initCommand := worker.TagVerifier{nil, t.secret}.InitCommand()
+
+	t.Logger.WithFields(logrus.Fields{
+		"InstanceType":         t.InstanceType.Name,
+		"ProviderInstanceType": t.InstanceType.ProviderType,
+		"ImageID":              t.ImageID,
+		"Tags":                 tags,
+		"InitCommand":          initCommand,
+	}).Info("creating instance")
+	inst, err := t.is.Create(t.InstanceType, t.ImageID, tags, initCommand, t.SSHKey.PublicKey())
+	if err != nil {
+		deferredError = true
+		t.Logger.WithError(err).Error("error creating test instance")
+		t.Logger.WithField("Deadline", bootDeadline).Info("waiting for instance to appear anyway, in case the Create response was incorrect")
+		for err = t.refreshTestInstance(); err != nil; err = t.refreshTestInstance() {
+			if time.Now().After(bootDeadline) {
+				t.Logger.Error("timed out")
+				return false
+			} else {
+				t.sleepSyncInterval()
+			}
+		}
+		t.Logger.WithField("Instance", t.testInstance.ID()).Info("new instance appeared")
+		t.showLoginInfo()
+	} else {
+		t.Logger.WithField("Instance", inst.ID()).Info("created instance")
+		t.testInstance = inst
+		t.showLoginInfo()
+		err = t.refreshTestInstance()
+		if err == errTestInstanceNotFound {
+			t.Logger.WithError(err).Error("cloud/driver Create succeeded, but instance is not in list")
+			deferredError = true
+		} else if err != nil {
+			t.Logger.WithError(err).Error("error getting list of instances")
+			return false
+		}
+	}
+	t.testInstance = worker.TagVerifier{t.testInstance, t.secret}
+
+	if !t.checkTags() {
+		// checkTags() already logged the errors
+		return false
+	}
+
+	if !t.waitForBoot(bootDeadline) {
+		return false
+	}
+
+	if t.ShellCommand != "" {
+		err = t.runShellCommand(t.ShellCommand)
+		if err != nil {
+			t.Logger.WithError(err).Error("shell command failed")
+			deferredError = true
+		}
+	}
+
+	if fn := t.PauseBeforeDestroy; fn != nil {
+		t.showLoginInfo()
+		fn()
+	}
+
+	return !deferredError
+}
+
+func (t *tester) showLoginInfo() {
+	t.updateExecutor()
+	host, port := t.executor.TargetHostPort()
+	if host == "" {
+		return
+	}
+	user := t.testInstance.RemoteUser()
+	t.Logger.WithField("Command", fmt.Sprintf("ssh -p%s %s@%s", port, user, host)).Info("showing login information")
+	t.showedLoginInfo = true
+}
+
+// Get the latest instance list from the driver. If our test instance
+// is found, assign it to t.testIntance.
+func (t *tester) refreshTestInstance() error {
+	insts, err := t.getInstances(cloud.InstanceTags{t.TagKeyPrefix + "InstanceSetID": string(t.SetID)})
+	if err != nil {
+		return err
+	}
+	for _, i := range insts {
+		if t.testInstance == nil {
+			// Filter by InstanceSetID tag value
+			if i.Tags()[t.TagKeyPrefix+"InstanceSetID"] != string(t.SetID) {
+				continue
+			}
+		} else {
+			// Filter by instance ID
+			if i.ID() != t.testInstance.ID() {
+				continue
+			}
+		}
+		t.Logger.WithFields(logrus.Fields{
+			"Instance": i.ID(),
+			"Address":  i.Address(),
+		}).Info("found our instance in returned list")
+		t.testInstance = worker.TagVerifier{i, t.secret}
+		if !t.showedLoginInfo {
+			t.showLoginInfo()
+		}
+		return nil
+	}
+	return errTestInstanceNotFound
+}
+
+func (t *tester) getInstances(tags cloud.InstanceTags) ([]cloud.Instance, error) {
+	var ret []cloud.Instance
+	t.Logger.WithField("FilterTags", tags).Info("getting instance list")
+	insts, err := t.is.Instances(tags)
+	if err != nil {
+		return nil, err
+	}
+	t.Logger.WithField("N", len(insts)).Info("got instance list")
+	for _, i := range insts {
+		if i.Tags()[t.TagKeyPrefix+"InstanceSetID"] == string(t.SetID) {
+			ret = append(ret, i)
+		}
+	}
+	return ret, nil
+}
+
+func (t *tester) checkTags() bool {
+	ok := true
+	for k, v := range t.Tags {
+		if got := t.testInstance.Tags()[k]; got != v {
+			ok = false
+			t.Logger.WithFields(logrus.Fields{
+				"Key":           k,
+				"ExpectedValue": v,
+				"GotValue":      got,
+			}).Error("tag is missing from test instance")
+		}
+	}
+	if ok {
+		t.Logger.Info("all expected tags are present")
+	}
+	return ok
+}
+
+func (t *tester) waitForBoot(deadline time.Time) bool {
+	for time.Now().Before(deadline) {
+		err := t.runShellCommand(t.BootProbeCommand)
+		if err == nil {
+			return true
+		}
+		t.sleepProbeInterval()
+		t.refreshTestInstance()
+	}
+	t.Logger.Error("timed out")
+	return false
+}
+
+func (t *tester) updateExecutor() {
+	if t.executor == nil {
+		t.executor = ssh_executor.New(t.testInstance)
+		t.executor.SetTargetPort(t.SSHPort)
+		t.executor.SetSigners(t.SSHKey)
+	} else {
+		t.executor.SetTarget(t.testInstance)
+	}
+}
+
+func (t *tester) runShellCommand(cmd string) error {
+	t.updateExecutor()
+	t.Logger.WithFields(logrus.Fields{
+		"Command": cmd,
+	}).Info("executing remote command")
+	stdout, stderr, err := t.executor.Execute(nil, cmd, nil)
+	lgr := t.Logger.WithFields(logrus.Fields{
+		"Command": cmd,
+		"stdout":  string(stdout),
+		"stderr":  string(stderr),
+	})
+	if err != nil {
+		lgr.WithError(err).Info("remote command failed")
+	} else {
+		lgr.Info("remote command succeeded")
+	}
+	return err
+}
+
+// currently, this tries forever until it can return true (success).
+func (t *tester) destroyTestInstance() bool {
+	if t.testInstance == nil {
+		return true
+	}
+	for {
+		t.Logger.WithField("Instance", t.testInstance.ID()).Info("destroying instance")
+		err := t.testInstance.Destroy()
+		if err != nil {
+			t.Logger.WithError(err).WithField("Instance", t.testInstance.ID()).Error("error destroying instance")
+		} else {
+			t.Logger.WithField("Instance", t.testInstance.ID()).Info("destroyed instance")
+		}
+		err = t.refreshTestInstance()
+		if err == errTestInstanceNotFound {
+			t.Logger.WithField("Instance", t.testInstance.ID()).Info("instance no longer appears in list")
+			t.testInstance = nil
+			return true
+		} else if err == nil {
+			t.Logger.WithField("Instance", t.testInstance.ID()).Info("instance still exists after calling Destroy")
+			t.sleepSyncInterval()
+			continue
+		} else {
+			t.Logger.WithError(err).Error("error getting list of instances")
+			continue
+		}
+	}
+}
+
+func (t *tester) sleepSyncInterval() {
+	t.Logger.WithField("Duration", t.SyncInterval).Info("waiting SyncInterval")
+	time.Sleep(t.SyncInterval)
+}
+
+func (t *tester) sleepProbeInterval() {
+	t.Logger.WithField("Duration", t.ProbeInterval).Info("waiting ProbeInterval")
+	time.Sleep(t.ProbeInterval)
+}
+
+// Return a random string of n hexadecimal digits (n*4 random bits). n
+// must be even.
+func randomHex(n int) string {
+	buf := make([]byte, n/2)
+	_, err := rand.Read(buf)
+	if err != nil {
+		panic(err)
+	}
+	return fmt.Sprintf("%x", buf)
+}
diff --git a/lib/cloud/cloudtest/tester_test.go b/lib/cloud/cloudtest/tester_test.go
new file mode 100644
index 000000000..358530bf8
--- /dev/null
+++ b/lib/cloud/cloudtest/tester_test.go
@@ -0,0 +1,97 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package cloudtest
+
+import (
+	"bytes"
+	"testing"
+	"time"
+
+	"git.curoverse.com/arvados.git/lib/cloud"
+	"git.curoverse.com/arvados.git/lib/dispatchcloud/test"
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+	"git.curoverse.com/arvados.git/sdk/go/ctxlog"
+	"golang.org/x/crypto/ssh"
+	check "gopkg.in/check.v1"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+	check.TestingT(t)
+}
+
+var _ = check.Suite(&TesterSuite{})
+
+type TesterSuite struct {
+	stubDriver *test.StubDriver
+	cluster    *arvados.Cluster
+	tester     *tester
+	log        bytes.Buffer
+}
+
+func (s *TesterSuite) SetUpTest(c *check.C) {
+	pubkey, privkey := test.LoadTestKey(c, "../../dispatchcloud/test/sshkey_dispatch")
+	_, privhostkey := test.LoadTestKey(c, "../../dispatchcloud/test/sshkey_vm")
+	s.stubDriver = &test.StubDriver{
+		HostKey:                   privhostkey,
+		AuthorizedKeys:            []ssh.PublicKey{pubkey},
+		ErrorRateDestroy:          0.1,
+		MinTimeBetweenCreateCalls: time.Millisecond,
+	}
+	tagKeyPrefix := "tagprefix:"
+	s.cluster = &arvados.Cluster{
+		ManagementToken: "test-management-token",
+		Containers: arvados.ContainersConfig{
+			CloudVMs: arvados.CloudVMsConfig{
+				SyncInterval:   arvados.Duration(10 * time.Millisecond),
+				TimeoutBooting: arvados.Duration(150 * time.Millisecond),
+				TimeoutProbe:   arvados.Duration(15 * time.Millisecond),
+				ProbeInterval:  arvados.Duration(5 * time.Millisecond),
+				ResourceTags:   map[string]string{"testtag": "test value"},
+			},
+		},
+		InstanceTypes: arvados.InstanceTypeMap{
+			test.InstanceType(1).Name: test.InstanceType(1),
+			test.InstanceType(2).Name: test.InstanceType(2),
+			test.InstanceType(3).Name: test.InstanceType(3),
+		},
+	}
+	s.tester = &tester{
+		Logger:           ctxlog.New(&s.log, "text", "info"),
+		Tags:             cloud.SharedResourceTags{"testtagkey": "testtagvalue"},
+		TagKeyPrefix:     tagKeyPrefix,
+		SetID:            cloud.InstanceSetID("test-instance-set-id"),
+		ProbeInterval:    5 * time.Millisecond,
+		SyncInterval:     10 * time.Millisecond,
+		TimeoutBooting:   150 * time.Millisecond,
+		Driver:           s.stubDriver,
+		DriverParameters: nil,
+		InstanceType:     test.InstanceType(2),
+		ImageID:          "test-image-id",
+		SSHKey:           privkey,
+		BootProbeCommand: "crunch-run --list",
+		ShellCommand:     "true",
+	}
+}
+
+func (s *TesterSuite) TestSuccess(c *check.C) {
+	s.tester.Logger = ctxlog.TestLogger(c)
+	ok := s.tester.Run()
+	c.Check(ok, check.Equals, true)
+}
+
+func (s *TesterSuite) TestBootFail(c *check.C) {
+	s.tester.BootProbeCommand = "falsey"
+	ok := s.tester.Run()
+	c.Check(ok, check.Equals, false)
+	c.Check(s.log.String(), check.Matches, `(?ms).*\\"falsey\\": command not found.*`)
+}
+
+func (s *TesterSuite) TestShellCommandFail(c *check.C) {
+	s.tester.ShellCommand = "falsey"
+	ok := s.tester.Run()
+	c.Check(ok, check.Equals, false)
+	c.Check(s.log.String(), check.Matches, `(?ms).*\\"falsey\\": command not found.*`)
+}
diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml
index dc128e56b..bad35a120 100644
--- a/lib/config/config.default.yml
+++ b/lib/config/config.default.yml
@@ -486,7 +486,7 @@ Clusters:
         # Shell command to execute on each worker to determine whether
         # the worker is booted and ready to run containers. It should
         # exit zero if the worker is ready.
-        BootProbeCommand: "docker ps"
+        BootProbeCommand: "docker ps -q"
 
         # Minimum interval between consecutive probes to a single
         # worker.
diff --git a/lib/config/generated_config.go b/lib/config/generated_config.go
index 98cd343bd..9ac519071 100644
--- a/lib/config/generated_config.go
+++ b/lib/config/generated_config.go
@@ -492,7 +492,7 @@ Clusters:
         # Shell command to execute on each worker to determine whether
         # the worker is booted and ready to run containers. It should
         # exit zero if the worker is ready.
-        BootProbeCommand: "docker ps"
+        BootProbeCommand: "docker ps -q"
 
         # Minimum interval between consecutive probes to a single
         # worker.
diff --git a/lib/dispatchcloud/dispatcher_test.go b/lib/dispatchcloud/dispatcher_test.go
index 6b73e71cc..fbe04393a 100644
--- a/lib/dispatchcloud/dispatcher_test.go
+++ b/lib/dispatchcloud/dispatcher_test.go
@@ -107,7 +107,7 @@ func (s *DispatcherSuite) TearDownTest(c *check.C) {
 // a fake queue and cloud driver. The fake cloud driver injects
 // artificial errors in order to exercise a variety of code paths.
 func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
-	drivers["test"] = s.stubDriver
+	Drivers["test"] = s.stubDriver
 	s.disp.setupOnce.Do(s.disp.initialize)
 	queue := &test.Queue{
 		ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) {
@@ -210,7 +210,7 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
 
 func (s *DispatcherSuite) TestAPIPermissions(c *check.C) {
 	s.cluster.ManagementToken = "abcdefgh"
-	drivers["test"] = s.stubDriver
+	Drivers["test"] = s.stubDriver
 	s.disp.setupOnce.Do(s.disp.initialize)
 	s.disp.queue = &test.Queue{}
 	go s.disp.run()
@@ -232,7 +232,7 @@ func (s *DispatcherSuite) TestAPIPermissions(c *check.C) {
 
 func (s *DispatcherSuite) TestAPIDisabled(c *check.C) {
 	s.cluster.ManagementToken = ""
-	drivers["test"] = s.stubDriver
+	Drivers["test"] = s.stubDriver
 	s.disp.setupOnce.Do(s.disp.initialize)
 	s.disp.queue = &test.Queue{}
 	go s.disp.run()
@@ -251,7 +251,7 @@ func (s *DispatcherSuite) TestAPIDisabled(c *check.C) {
 func (s *DispatcherSuite) TestInstancesAPI(c *check.C) {
 	s.cluster.ManagementToken = "abcdefgh"
 	s.cluster.Containers.CloudVMs.TimeoutBooting = arvados.Duration(time.Second)
-	drivers["test"] = s.stubDriver
+	Drivers["test"] = s.stubDriver
 	s.disp.setupOnce.Do(s.disp.initialize)
 	s.disp.queue = &test.Queue{}
 	go s.disp.run()
diff --git a/lib/dispatchcloud/driver.go b/lib/dispatchcloud/driver.go
index a8f3d5b5e..49437e318 100644
--- a/lib/dispatchcloud/driver.go
+++ b/lib/dispatchcloud/driver.go
@@ -17,13 +17,13 @@ import (
 	"golang.org/x/crypto/ssh"
 )
 
-var drivers = map[string]cloud.Driver{
+var Drivers = map[string]cloud.Driver{
 	"azure": azure.Driver,
 	"ec2":   ec2.Driver,
 }
 
 func newInstanceSet(cluster *arvados.Cluster, setID cloud.InstanceSetID, logger logrus.FieldLogger, reg *prometheus.Registry) (cloud.InstanceSet, error) {
-	driver, ok := drivers[cluster.Containers.CloudVMs.Driver]
+	driver, ok := Drivers[cluster.Containers.CloudVMs.Driver]
 	if !ok {
 		return nil, fmt.Errorf("unsupported cloud driver %q", cluster.Containers.CloudVMs.Driver)
 	}
@@ -85,7 +85,7 @@ func (is defaultTaggingInstanceSet) Create(it arvados.InstanceType, image cloud.
 	return is.InstanceSet.Create(it, image, allTags, init, pk)
 }
 
-// Filters the instances returned by the wrapped InstanceSet's
+// Filter the instances returned by the wrapped InstanceSet's
 // Instances() method (in case the wrapped InstanceSet didn't do this
 // itself).
 type filteringInstanceSet struct {
diff --git a/lib/dispatchcloud/ssh_executor/executor.go b/lib/dispatchcloud/ssh_executor/executor.go
index feed1c2a7..d608763cf 100644
--- a/lib/dispatchcloud/ssh_executor/executor.go
+++ b/lib/dispatchcloud/ssh_executor/executor.go
@@ -173,14 +173,13 @@ func (exr *Executor) sshClient(create bool) (*ssh.Client, error) {
 	return exr.client, exr.clientErr
 }
 
-// Create a new SSH client.
-func (exr *Executor) setupSSHClient() (*ssh.Client, error) {
-	target := exr.Target()
-	addr := target.Address()
+func (exr *Executor) TargetHostPort() (string, string) {
+	addr := exr.Target().Address()
 	if addr == "" {
-		return nil, errors.New("instance has no address")
+		return "", ""
 	}
-	if h, p, err := net.SplitHostPort(addr); err != nil || p == "" {
+	h, p, err := net.SplitHostPort(addr)
+	if err != nil || p == "" {
 		// Target address does not specify a port.  Use
 		// targetPort, or "ssh".
 		if h == "" {
@@ -189,11 +188,19 @@ func (exr *Executor) setupSSHClient() (*ssh.Client, error) {
 		if p = exr.targetPort; p == "" {
 			p = "ssh"
 		}
-		addr = net.JoinHostPort(h, p)
+	}
+	return h, p
+}
+
+// Create a new SSH client.
+func (exr *Executor) setupSSHClient() (*ssh.Client, error) {
+	addr := net.JoinHostPort(exr.TargetHostPort())
+	if addr == ":" {
+		return nil, errors.New("instance has no address")
 	}
 	var receivedKey ssh.PublicKey
 	client, err := ssh.Dial("tcp", addr, &ssh.ClientConfig{
-		User: target.RemoteUser(),
+		User: exr.Target().RemoteUser(),
 		Auth: []ssh.AuthMethod{
 			ssh.PublicKeys(exr.signers...),
 		},
@@ -210,7 +217,7 @@ func (exr *Executor) setupSSHClient() (*ssh.Client, error) {
 	}
 
 	if exr.hostKey == nil || !bytes.Equal(exr.hostKey.Marshal(), receivedKey.Marshal()) {
-		err = target.VerifyHostKey(receivedKey, client)
+		err = exr.Target().VerifyHostKey(receivedKey, client)
 		if err != nil {
 			return nil, err
 		}
diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go
index 201e8aad2..e00a8683a 100644
--- a/lib/dispatchcloud/worker/pool.go
+++ b/lib/dispatchcloud/worker/pool.go
@@ -292,7 +292,7 @@ func (wp *Pool) Create(it arvados.InstanceType) bool {
 			wp.tagKeyPrefix + tagKeyIdleBehavior:   string(IdleBehaviorRun),
 			wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
 		}
-		initCmd := cloud.InitCommand(fmt.Sprintf("umask 0177 && echo -n %q >%s", secret, instanceSecretFilename))
+		initCmd := TagVerifier{nil, secret}.InitCommand()
 		inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
 		wp.mtx.Lock()
 		defer wp.mtx.Unlock()
@@ -346,7 +346,7 @@ func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior)
 // Caller must have lock.
 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
 	secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
-	inst = tagVerifier{inst, secret}
+	inst = TagVerifier{inst, secret}
 	id := inst.ID()
 	if wkr := wp.workers[id]; wkr != nil {
 		wkr.executor.SetTarget(inst)
diff --git a/lib/dispatchcloud/worker/verify.go b/lib/dispatchcloud/worker/verify.go
index 330071951..c71870210 100644
--- a/lib/dispatchcloud/worker/verify.go
+++ b/lib/dispatchcloud/worker/verify.go
@@ -21,13 +21,17 @@ var (
 	instanceSecretLength   = 40 // hex digits
 )
 
-type tagVerifier struct {
+type TagVerifier struct {
 	cloud.Instance
-	secret string
+	Secret string
 }
 
-func (tv tagVerifier) VerifyHostKey(pubKey ssh.PublicKey, client *ssh.Client) error {
-	if err := tv.Instance.VerifyHostKey(pubKey, client); err != cloud.ErrNotImplemented || tv.secret == "" {
+func (tv TagVerifier) InitCommand() cloud.InitCommand {
+	return cloud.InitCommand(fmt.Sprintf("umask 0177 && echo -n %q >%s", tv.Secret, instanceSecretFilename))
+}
+
+func (tv TagVerifier) VerifyHostKey(pubKey ssh.PublicKey, client *ssh.Client) error {
+	if err := tv.Instance.VerifyHostKey(pubKey, client); err != cloud.ErrNotImplemented || tv.Secret == "" {
 		// If the wrapped instance indicates it has a way to
 		// verify the key, return that decision.
 		return err
@@ -49,7 +53,7 @@ func (tv tagVerifier) VerifyHostKey(pubKey ssh.PublicKey, client *ssh.Client) er
 	if err != nil {
 		return err
 	}
-	if stdout.String() != tv.secret {
+	if stdout.String() != tv.Secret {
 		return errBadInstanceSecret
 	}
 	return nil

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list