[ARVADOS] updated: 1.2.0-9-g587678dee

Git user git at public.curoverse.com
Fri Aug 24 15:35:52 EDT 2018


Summary of changes:
 build/run-tests.sh                             |   1 +
 lib/cloud/interfaces.go                        |  60 +++-
 lib/dispatchcloud/test/lame_provider.go        | 118 ++++++++
 lib/dispatchcloud/{ => worker}/gocheck_test.go |   2 +-
 lib/dispatchcloud/worker/pool.go               | 379 ++++++++-----------------
 lib/dispatchcloud/worker/pool_test.go          | 101 +++++--
 lib/dispatchcloud/worker/worker.go             | 178 ++++++++++++
 sdk/go/arvados/config.go                       |  11 +
 8 files changed, 560 insertions(+), 290 deletions(-)
 create mode 100644 lib/dispatchcloud/test/lame_provider.go
 copy lib/dispatchcloud/{ => worker}/gocheck_test.go (90%)
 create mode 100644 lib/dispatchcloud/worker/worker.go

  discards  7fef859a22f5c27a4e5e08a9f7c9559dc5daca3d (commit)
       via  587678dee4c33ad07db18897b71b732567e56661 (commit)

This update added new revisions after undoing existing revisions.  That is
to say, the old revision is not a strict subset of the new revision.  This
situation occurs when you --force push a change and generate a repository
containing something like this:

 * -- * -- B -- O -- O -- O (7fef859a22f5c27a4e5e08a9f7c9559dc5daca3d)
            \
             N -- N -- N (587678dee4c33ad07db18897b71b732567e56661)

When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 587678dee4c33ad07db18897b71b732567e56661
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Fri Aug 24 15:35:38 2018 -0400

    13964: sketch
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/build/run-build-packages.sh b/build/run-build-packages.sh
index caebac013..11588913e 100755
--- a/build/run-build-packages.sh
+++ b/build/run-build-packages.sh
@@ -295,6 +295,8 @@ package_go_binary cmd/arvados-server arvados-server \
     "Arvados server daemons"
 package_go_binary cmd/arvados-server arvados-controller \
     "Arvados cluster controller daemon"
+package_go_binary cmd/arvados-server arvados-dispatch-cloud \
+    "Arvados cluster cloud dispatch"
 package_go_binary sdk/go/crunchrunner crunchrunner \
     "Crunchrunner executes a command inside a container and uploads the output"
 package_go_binary services/arv-git-httpd arvados-git-httpd \
diff --git a/build/run-tests.sh b/build/run-tests.sh
index e669e326c..2ade688af 100755
--- a/build/run-tests.sh
+++ b/build/run-tests.sh
@@ -77,6 +77,7 @@ lib/cmd
 lib/controller
 lib/crunchstat
 lib/dispatchcloud
+lib/dispatchcloud/worker
 services/api
 services/arv-git-httpd
 services/crunchstat
@@ -917,6 +918,7 @@ gostuff=(
     lib/controller
     lib/crunchstat
     lib/dispatchcloud
+    lib/dispatchcloud/worker
     sdk/go/arvados
     sdk/go/arvadosclient
     sdk/go/blockdigest
diff --git a/cmd/arvados-server/arvados-dispatch-cloud.service b/cmd/arvados-server/arvados-dispatch-cloud.service
new file mode 100644
index 000000000..5ea5d45e7
--- /dev/null
+++ b/cmd/arvados-server/arvados-dispatch-cloud.service
@@ -0,0 +1,28 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+[Unit]
+Description=Arvados cloud dispatch
+Documentation=https://doc.arvados.org/
+After=network.target
+AssertPathExists=/etc/arvados/config.yml
+
+# systemd==229 (ubuntu:xenial) obeys StartLimitInterval in the [Unit] section
+StartLimitInterval=0
+
+# systemd>=230 (debian:9) obeys StartLimitIntervalSec in the [Unit] section
+StartLimitIntervalSec=0
+
+[Service]
+Type=notify
+EnvironmentFile=-/etc/arvados/environment
+ExecStart=/usr/bin/arvados-dispatch-cloud
+Restart=always
+RestartSec=1
+
+# systemd<=219 (centos:7, debian:8, ubuntu:trusty) obeys StartLimitInterval in the [Service] section
+StartLimitInterval=0
+
+[Install]
+WantedBy=multi-user.target
diff --git a/cmd/arvados-server/cmd.go b/cmd/arvados-server/cmd.go
index 1af3745df..cd15d25dd 100644
--- a/cmd/arvados-server/cmd.go
+++ b/cmd/arvados-server/cmd.go
@@ -9,6 +9,7 @@ import (
 
 	"git.curoverse.com/arvados.git/lib/cmd"
 	"git.curoverse.com/arvados.git/lib/controller"
+	"git.curoverse.com/arvados.git/lib/dispatchcloud"
 )
 
 var (
@@ -18,7 +19,8 @@ var (
 		"-version":  cmd.Version(version),
 		"--version": cmd.Version(version),
 
-		"controller": controller.Command,
+		"controller":     controller.Command,
+		"dispatch-cloud": dispatchcloud.Command,
 	})
 )
 
diff --git a/lib/cloud/interfaces.go b/lib/cloud/interfaces.go
new file mode 100644
index 000000000..0a2ec47bd
--- /dev/null
+++ b/lib/cloud/interfaces.go
@@ -0,0 +1,140 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package cloud
+
+import (
+	"time"
+
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+	"golang.org/x/crypto/ssh"
+)
+
+// A RateLimitError should be returned by an InstanceSet when the
+// cloud service indicates it is rejecting all API calls for some time
+// interval.
+type RateLimitError interface {
+	// Time before which the caller should expect requests to
+	// fail.
+	EarliestRetry() time.Time
+	error
+}
+
+// A QuotaError should be returned by an InstanceSet when the cloud
+// service indicates the account cannot create more VMs than already
+// exist.
+type QuotaError interface {
+	// If true, don't create more instances until some existing
+	// instances are destroyed. If false, don't handle the error
+	// as a quota error.
+	IsQuotaError() bool
+	error
+}
+
+type InstanceSetID string
+type InstanceTags map[string]string
+type InstanceID string
+type ImageID string
+
+// Instance is implemented by the provider-specific instance types.
+type Instance interface {
+	// ID returns the provider's instance ID. It must be stable
+	// for the life of the instance.
+	ID() InstanceID
+
+	// String typically returns the cloud-provided instance ID.
+	String() string
+
+	// Cloud provider's "instance type" ID. Matches a ProviderType
+	// in the cluster's InstanceTypes configuration.
+	ProviderType() string
+
+	// Get current tags
+	Tags() InstanceTags
+
+	// Replace tags with the given tags
+	SetTags(InstanceTags) error
+
+	// Shut down the node
+	Destroy() error
+
+	// SSH server hostname or IP address, or empty string if
+	// unknown while instance is booting.
+	Address() string
+
+	// Return nil if the given public key matches the instance's
+	// SSH server key. If the provided Dialer is not nil,
+	// VerifyPublicKey can use it to make outgoing network
+	// connections from the instance -- e.g., to use the cloud's
+	// "this instance's metadata" API.
+	VerifyPublicKey(ssh.PublicKey, *ssh.Client) error
+}
+
+// An InstanceSet manages a set of VM instances created by an elastic
+// cloud provider like AWS, GCE, or Azure.
+//
+// All public methods of an InstanceSet, and all public methods of the
+// instances it returns, are goroutine safe.
+type InstanceSet interface {
+	// Create a new instance. If supported by the driver, add the
+	// provided public key to /root/.ssh/authorized_keys.
+	//
+	// The returned error should implement RateLimitError and
+	// QuotaError where applicable.
+	Create(arvados.InstanceType, ImageID, InstanceTags, ssh.PublicKey) (Instance, error)
+
+	// Return all instances, including ones that are booting or
+	// shutting down. Optionally, filter out nodes that don't have
+	// all of the given InstanceTags (the caller will ignore these
+	// anyway).
+	//
+	// An instance returned by successive calls to Instances() may
+	// -- but does not need to -- be represented by the same
+	// Instance object each time. Thus, the caller is responsible
+	// for de-duplicating the returned instances by comparing the
+	// InstanceIDs returned by the instances' ID() methods.
+	Instances(InstanceTags) ([]Instance, error)
+
+	// Stop any background tasks and release other resources.
+	Stop()
+}
+
+// A Driver returns an InstanceSet that uses the given
+// driver-dependent configuration parameters.
+//
+// The supplied id will be of the form "zzzzz-zzzzz-zzzzzzzzzzzzzzz"
+// where each z can be any alphanum. The returned InstanceSet must use
+// this id to tag long-lived cloud resources that it creates, and must
+// assume control of any existing resources that are tagged with the
+// same id. Tagging can be accomplished by including the ID in
+// resource names, using the cloud provider's tagging feature, or any
+// other mechanism. The tags must be visible to another instance of
+// the same driver running on a different host.
+//
+// The returned InstanceSet must ignore existing resources that are
+// visible but not tagged with the given id, except that it should log
+// a summary of such resources -- only once -- when it starts
+// up. Thus, two identically configured providers running on different
+// hosts with different ids should log about the existence of each
+// other's resources at startup, but will not interfere with each
+// other.
+//
+// Example:
+//
+//	type provider struct {
+//		ownID     string
+//		AccessKey string
+//	}
+//
+//	func ExampleDriver(config map[string]interface{}, id InstanceSetID) (InstanceSet, error) {
+//		var p provider
+//		if err := mapstructure.Decode(config, &p); err != nil {
+//			return nil, err
+//		}
+//		p.ownID = id
+//		return &p, nil
+//	}
+//
+//	var _ = registerCloudDriver("example", ExampleDriver)
+type Driver func(config map[string]interface{}, id InstanceSetID) (InstanceSet, error)
diff --git a/lib/dispatchcloud/cmd.go b/lib/dispatchcloud/cmd.go
new file mode 100644
index 000000000..b2bc91300
--- /dev/null
+++ b/lib/dispatchcloud/cmd.go
@@ -0,0 +1,17 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+	"git.curoverse.com/arvados.git/lib/cmd"
+	"git.curoverse.com/arvados.git/lib/service"
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+var Command cmd.Handler = service.Command(arvados.ServiceNameDispatchCloud, newHandler)
+
+func newHandler(cluster *arvados.Cluster, np *arvados.NodeProfile) service.Handler {
+	return &dispatcher{Cluster: cluster, NodeProfile: np}
+}
diff --git a/lib/dispatchcloud/container_queue.go b/lib/dispatchcloud/container_queue.go
new file mode 100644
index 000000000..f8c0512b6
--- /dev/null
+++ b/lib/dispatchcloud/container_queue.go
@@ -0,0 +1,201 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+	"sync"
+
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+type containerQueue struct {
+	Logger  logger
+	Cluster *arvados.Cluster
+	Client  *arvados.Client
+
+	current   map[string]*arvados.Container
+	mtx       sync.Mutex
+	keeplocal map[string]struct{}
+}
+
+func (cq *containerQueue) Forget(uuid string) {
+	cq.mtx.Lock()
+	defer cq.mtx.Unlock()
+	ctr := cq.current[uuid]
+	if ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled {
+		delete(cq.current, uuid)
+	}
+}
+
+func (cq *containerQueue) Get(uuid) (arvados.Container, bool) {
+	cq.mtx.Lock()
+	defer cq.mtx.Unlock()
+	if ctr, ok := cq.current[uuid]; !ok {
+		return arvados.Container{}, false
+	} else {
+		return *ctr
+	}
+}
+
+func (cq *containerQueue) All() map[string]arvados.Container {
+	cq.mtx.Lock()
+	defer cq.mtx.Unlock()
+	ret := make(map[string]*arvados.Container, len(cq.current))
+	for uuid, ctr := range cq.current {
+		ret[uuid] = *ctr
+	}
+	return ret
+}
+
+func (cq *containerQueue) setup() {
+	cq.current.Store(map[string]*arvados.Container{})
+}
+
+func (cq *containerQueue) update() error {
+	cq.mtx.Lock()
+	cq.keeplocal = map[string]struct{}{}
+	cq.mtx.Unlock()
+
+	defer func() { cq.mtx.Lock(); cq.keeplocal = nil; cq.mtx.Unlock() }()
+
+	next, err := cq.poll()
+	if err != nil {
+		return err
+	}
+	cq.mtx.Lock()
+	defer cq.mtx.Unlock()
+	for uuid := range cq.keeplocal {
+		next[uuid] = cq.current[uuid]
+	}
+	cq.current = next
+}
+
+func (cq *containerQueue) Lock(uuid string) error {
+	return cq.apiUpdate(uuid, "lock")
+}
+
+func (cq *containerQueue) Unlock(uuid string) error {
+	return cq.apiUpdate(uuid, "unlock")
+}
+
+func (cq *containerQueue) apiUpdate(uuid, action string) error {
+	var resp arvados.Container
+	err := cq.Client.RequestAndDecode(&resp, "POST", "arvados/v1/containers/"+uuid+"/"+action, nil, nil)
+	if err != nil {
+		return err
+	}
+
+	cq.mtx.Lock()
+	defer cq.mtx.Unlock()
+	if cq.keeplocal != nil {
+		cq.keeplocal[uuid] = struct{}{}
+	}
+	if ctr, ok := cq.current[uuid]; !ok {
+		cq.current[uuid] = &resp
+	} else {
+		ctr.State, ctr.Priority, ctr.LockedByUUID = resp.State, resp.Priority, resp.LockedByUUID
+	}
+	return nil
+}
+
+func (cq *containerQueue) poll() (map[string]*arvados.Container, error) {
+	cq.mtx.Lock()
+	size := len(cq.current)
+	cq.mtx.Unlock()
+
+	next := make(map[string]*arvados.Container, size)
+	apply := func(updates []arvados.Container) {
+		for _, upd := range updates {
+			if next[ctr.UUID] == nil {
+				next[ctr.UUID] = &arvados.Container{}
+			}
+			*next[ctr.UUID] = upd
+		}
+	}
+	selectParam := []string{"uuid", "state", "priority"}
+	limitParam := 1000
+
+	mine, err := cq.fetchAll(arvados.ResourceListParams{
+		Select:  selectParam,
+		Order:   []string{"uuid"},
+		Limit:   &limitParam,
+		Count:   "none",
+		Filters: {{"locked_by_uuid", "=", cq.authUUID}},
+	})
+	if err != nil {
+		return nil, err
+	}
+	apply(mine)
+
+	avail, err := cq.fetchAll(arvados.ResourceListParams{
+		Select:  selectParam,
+		Order:   []string{"uuid"},
+		Limit:   &limitParam,
+		Count:   "none",
+		Filters: {{"state", "=", Queued}, {"priority", ">", "0"}},
+	})
+	if err != nil {
+		return err
+	}
+	apply(avail)
+
+	var missing []string
+	cq.mtx.Lock()
+	for uuid, ctr := range cq.current {
+		if next[uuid] == nil &&
+			ctr.State != arvados.ContainerStateCancelled &&
+			ctr.state != arvados.ContainerStateComplete {
+			missing = append(missing, uuid)
+		}
+	}
+	cq.mtx.Unlock()
+
+	for i, page := 0, 20; i < len(missing); i += page {
+		batch := missing[i:]
+		if len(batch) > page {
+			batch = batch[:page]
+		}
+		ended, err := cq.fetchAll(arvados.ResourceListParams{
+			Select:  selectParam,
+			Order:   []string{"uuid"},
+			Count:   "none",
+			Filters: {{"uuid", "in", batch}},
+		})
+		if err != nil {
+			return err
+		}
+		apply(ended)
+	}
+	return next, nil
+}
+
+func (cq *containerQueue) fetchAll(initialParams arvados.ResourceListParams) ([]arvados.Container, error) {
+	var results []arvados.Container
+	params := initialParams
+	params.Offset = 0
+	for {
+		// This list variable must be a new one declared
+		// inside the loop: otherwise, items in the API
+		// response would get deep-merged into the items
+		// loaded in previous iterations.
+		var list arvados.ContainerList
+
+		err := cq.Client.RequestAndDecode(&list, "GET", "arvados/v1/containers", nil, params)
+		if err != nil {
+			return nil, err
+		}
+		if len(list.Items) == 0 {
+			break
+		}
+
+		results = append(results, list.Items)
+		if len(params.Order) == 1 && params.Order[0] == "uuid" {
+			params.Filters = append(initialParams.Filters, []interface{}{"uuid", ">", list.Items[len(list.Items)-1].UUID})
+		} else {
+			params.Offset += len(list.Items)
+		}
+	}
+	return results, nil
+}
diff --git a/lib/dispatchcloud/dispatcher.go b/lib/dispatchcloud/dispatcher.go
new file mode 100644
index 000000000..90e2c799f
--- /dev/null
+++ b/lib/dispatchcloud/dispatcher.go
@@ -0,0 +1,61 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+	"net/http"
+
+	"git.curoverse.com/arvados.git/lib/cloud"
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+	"github.com/Sirupsen/logrus"
+)
+
+type dispatcher struct {
+	Cluster     *arvados.Cluster
+	NodeProfile *arvados.NodeProfile
+
+	logger         logger
+	provider       cloud.Provider
+	workerPool     workerPool
+	queue          containerQueue
+	scheduler      scheduler
+	syncer         syncer
+	staleLockFixer staleLockFixer
+	httpHandler    http.Handler
+}
+
+// ServeHTTP implements service.Handler.
+func (disp *dispatcher) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+	disp.setupOnce.Do(disp.setup)
+	disp.handler.ServeHTTP(w, r)
+}
+
+// CheckHealth implements service.Handler.
+func (disp *dispatcher) CheckHealth() error {
+	disp.setupOnce.Do(disp.setup)
+	return nil
+}
+
+func (disp *dispatcher) setup() {
+	disp.logger = logrus.StandardLogger()
+	disp.provider = &providerProxy{disp.logger, newProvider(disp.Cluster)}
+	disp.workerPool = &workerPool{disp.logger, disp.provider}
+	disp.queue = &containerQueue{disp.logger, disp.Cluster}
+	disp.scheduler = &scheduler{disp.logger, disp.queue, disp.workerPool}
+	disp.syncer = &syncer{disp.logger, disp.queue, disp.workerPool}
+	disp.staleLockFixer = &staleLockFixer{disp.logger, disp.queue, disp.workerPool}
+
+	go func() {
+		disp.workerPool.Start()
+		// staleLockFixer must be ready before scheduler can start
+		disp.staleLockFixer.Wait()
+		go disp.scheduler.Run()
+		go disp.syncer.Run()
+	}()
+
+	mux := http.NewServeMux()
+	mux.Handle("/status.json", disp.serveStatusJSON)
+	disp.httpHandler = mux
+}
diff --git a/lib/dispatchcloud/logger.go b/lib/dispatchcloud/logger.go
new file mode 100644
index 000000000..90bb6ca68
--- /dev/null
+++ b/lib/dispatchcloud/logger.go
@@ -0,0 +1,29 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+	"sync"
+	"time"
+)
+
+type logger interface {
+	Printf(string, ...interface{})
+	Warnf(string, ...interface{})
+	Debugf(string, ...interface{})
+}
+
+var nextSpam = map[string]time.Time{}
+var nextSpamMtx sync.Mutex
+
+func unspam(msg string) bool {
+	nextSpamMtx.Lock()
+	defer nextSpamMtx.Unlock()
+	if nextSpam[msg].Before(time.Now()) {
+		nextSpam[msg] = time.Now().Add(time.Minute)
+		return true
+	}
+	return false
+}
diff --git a/lib/dispatchcloud/readme.go b/lib/dispatchcloud/readme.go
new file mode 100644
index 000000000..a4b005eb8
--- /dev/null
+++ b/lib/dispatchcloud/readme.go
@@ -0,0 +1,79 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+// A dispatcher comprises a container queue, a scheduler, a worker
+// pool, a cloud provider, a stale-lock fixer, and a syncer.
+// 1. Choose a provider.
+// 2. Start a worker pool.
+// 3. Start a container queue.
+// 4. Run a stale-lock fixer.
+// 5. Start a scheduler.
+// 6. Start a syncer.
+//
+//
+// A provider (cloud driver) creates new cloud VM instances and gets
+// the latest list of instances. The returned instances implement
+// proxies to the provider's metadata and control interfaces (get IP
+// address, update tags, shutdown).
+//
+//
+// A workerPool tracks workers' instance types and readiness states
+// (available to do work now, booting, suffering a temporary network
+// outage, shutting down). It loads internal state from the cloud
+// provider's list of instances at startup, and syncs periodically
+// after that.
+//
+//
+// A worker maintains a multiplexed SSH connection to a cloud
+// instance, retrying/reconnecting as needed, so the workerPool can
+// execute commands. It asks the provider's instance to verify its SSH
+// public key once when first connecting, and again later if the key
+// changes.
+//
+//
+// A container queue tracks the known state (according to
+// arvados-controller) of each container of interest -- i.e., queued,
+// or locked/running using our own dispatch token. It also proxies the
+// dispatcher's lock/unlock/cancel requests to the controller. It
+// handles concurrent refresh and update operations without exposing
+// out-of-order updates to its callers. (It drops any new information
+// that might have originated before its own most recent
+// lock/unlock/cancel operation.)
+//
+//
+// A stale-lock fixer waits for any already-locked containers (i.e.,
+// locked by a prior server process) to appear on workers as the
+// worker pool recovers its state. It unlocks/requeues any that still
+// remain when all workers are recovered or shutdown, or its timer
+// expires.
+//
+//
+// A scheduler chooses which containers to assign to which idle
+// workers, and decides what to do when there are not enough idle
+// workers (including shutting down some idle nodes).
+//
+//
+// A syncer updates state to Cancelled when a running container
+// process dies without finalizing its entry in the controller
+// database. It also calls the worker pool to kill containers that
+// have priority=0 while locked or running.
+//
+//
+// A provider proxy wraps a provider with rate-limiting logic. After
+// the wrapped provider receives a cloud.RateLimitError, the proxy
+// starts returning errors to callers immediately without calling
+// through to the wrapped provider.
+//
+//
+// TBD: Bootstrapping script via SSH, too? Future version.
+//
+// TBD: drain instance, keep instance alive
+// TBD: metrics, diagnostics
+// TBD: why dispatch token currently passed to worker?
+//
+// Metrics: queue size, time job has been in queued, #idle/busy/booting nodes
+// Timing in each step, and end-to-end
+// Metrics: boot/idle/alloc time and cost
diff --git a/lib/dispatchcloud/scheduler.go b/lib/dispatchcloud/scheduler.go
new file mode 100644
index 000000000..a7dd3f244
--- /dev/null
+++ b/lib/dispatchcloud/scheduler.go
@@ -0,0 +1,155 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+	"sort"
+	"sync"
+	"time"
+
+	"git.curoverse.com/arvados.git/lib/cloud"
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+type container struct {
+	container arvados.Container
+	wantsType arvados.InstanceType
+}
+
+func (c *container) String() string {
+	return c.container.UUID
+}
+
+// A scheduler assigns queued containers to available workers, and
+// creates new workers when there aren't enough available.
+//
+// If it encounters problems creating new workers, the scheduler also
+// shuts down idle workers in case they are consuming quota.
+// Otherwise, workers are responsible for shutting themselves down
+// after the configured idle time threshold.
+type scheduler struct {
+	logger
+	containerQueue
+	workerPool
+
+	queue           []*container
+	queueMtx        sync.Mutex
+	instances       map[cloud.Instance]struct{}
+	enqueueOrUpdate chan container
+	runOnce         sync.Once
+}
+
+func (sched *scheduler) try() {
+	ctrs := sched.containerQueue.Current()
+	queue := make([]arvados.Container, 0, len(ctrs))
+	for uuid, ctr := range ctrs {
+		queue = append(queue, ctr)
+	}
+	sort.Slice(queue, func(i, j int) bool {
+		queue[i].Priority > queue[j].Priority
+	})
+	dibs := map[arvados.InstanceType]int{}
+	for _, ctr := range queue {
+		switch {
+		case ctr.State == arvados.ContainerStateQueued && ctr.Priority > 0:
+			it, err := ChooseInstanceType(sched.Cluster, &ctr)
+			if err != nil {
+				sched.logger.Warnf("cannot run %s", &ctr)
+				continue
+			}
+
+			if dibs[it] >= sched.workerPool.Pending(it) {
+				err := sched.workerPool.Create(it)
+				if err != nil {
+					if unspam(err.Error()) {
+						sched.logger.Warnf("scheduler: workerPool.Create: %s", err)
+					}
+					return
+				}
+			}
+			dibs[it]++
+			// ...
+		case ctr.State == arvados.ContainerStateLocked || ctr.State == arvados.ContainerStateRunning:
+			// ...
+		}
+	}
+}
+
+func (sched *scheduler) setup() {
+	sched.enqueueOrUpdate = make(chan container, 1)
+	go sched.run()
+}
+
+func (sched *scheduler) run() {
+	wakeup := make(chan struct{}, 1)
+	workers := map[*worker]*container{}
+	ctrs := map[string]*container{} // key is container UUID
+	timer := time.NewTimer(time.Second)
+	queue := []*container{}
+	for {
+		select {
+		case ctr := <-sched.enqueueOrUpdate:
+			// Get a newly queued container, or update
+			// priority/state.
+			if ctrs[ctr.UUID] == nil {
+				ctrs[ctr.UUID] = &ctr
+			} else {
+				ctrs[ctr.UUID].container = ctr.container
+				continue
+			}
+		case <-timer.C:
+		case <-wakeup:
+		}
+
+		queue = queue[:0]
+		for _, ctr := range ctrs {
+			if ctr.State == arvados.ContainerStateLocked {
+				queue = append(queue, ctr)
+			}
+		}
+		sort.Slice(queue, func(i, j int) bool {
+			if d := queue[i].Priority - queue[j].Priority; d != 0 {
+				return d > 0
+			} else {
+				return queue[i].UUID > queue[j].UUID
+			}
+		})
+
+		// Dispatch highest priority container to the idle
+		// worker with the shortest idle time.
+		for len(queue) > 0 {
+			select {
+			case todo[queue[0].wantsType] <- queue[0]:
+				queue = queue[1:]
+				continue
+			default:
+			}
+			break
+		}
+
+		// Compare queue to booting.
+	}
+}
+
+// DispatchFunc returns a dispatch.DispatchFunc.
+func (sched *scheduler) DispatchFunc(cluster *arvados.Cluster) func(actr arvados.Container, update <-chan arvados.Container) {
+	go sched.runOnce.Do(sched.run)
+	return func(actr arvados.Container, update <-chan arvados.Container) {
+		it, err := ChooseInstanceType(cluster, &actr)
+		if err != nil {
+			return err
+		}
+		sched.enqueueOrUpdate <- container{
+			container: actr,
+			wantsType: it,
+		}
+		for actr := range update {
+			sched.enqueueOrUpdate <- container{
+				container: actr,
+				wantsType: it,
+			}
+		}
+	}
+}
diff --git a/lib/dispatchcloud/test/lame_provider.go b/lib/dispatchcloud/test/lame_provider.go
new file mode 100644
index 000000000..996a63821
--- /dev/null
+++ b/lib/dispatchcloud/test/lame_provider.go
@@ -0,0 +1,118 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package test
+
+import (
+	"fmt"
+	"math/rand"
+	"sync"
+
+	"git.curoverse.com/arvados.git/lib/cloud"
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+	"golang.org/x/crypto/ssh"
+)
+
+// LameInstanceSet creates instances that boot but can't run
+// containers.
+type LameInstanceSet struct {
+	Hold chan bool // set to make(chan bool) to hold operations until Release is called
+
+	mtx       sync.Mutex
+	instances map[*lameInstance]bool
+}
+
+// Create returns a new instance.
+func (p *LameInstanceSet) Create(instType arvados.InstanceType, imageID cloud.ImageID, tags cloud.InstanceTags, pubkey ssh.PublicKey) (cloud.Instance, error) {
+	inst := &lameInstance{
+		p:            p,
+		id:           cloud.InstanceID(fmt.Sprintf("lame-%x", rand.Uint64())),
+		providerType: instType.ProviderType,
+	}
+	inst.SetTags(tags)
+	if p.Hold != nil {
+		p.Hold <- true
+	}
+	p.mtx.Lock()
+	defer p.mtx.Unlock()
+	if p.instances == nil {
+		p.instances = map[*lameInstance]bool{}
+	}
+	p.instances[inst] = true
+	return inst, nil
+}
+
+// Instances returns the instances that haven't been destroyed.
+func (p *LameInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
+	p.mtx.Lock()
+	defer p.mtx.Unlock()
+	var instances []cloud.Instance
+	for i := range p.instances {
+		instances = append(instances, i)
+	}
+	return instances, nil
+}
+
+// Stop is a no-op, but exists to satisfy cloud.InstanceSet.
+func (p *LameInstanceSet) Stop() {
+}
+
+// Release n held calls. Blocks if n calls aren't already
+// waiting. Blocks forever if Hold is nil.
+func (p *LameInstanceSet) Release(n int) {
+	for i := 0; i < n; i++ {
+		<-p.Hold
+	}
+}
+
+type lameInstance struct {
+	p            *LameInstanceSet
+	id           cloud.InstanceID
+	providerType string
+	tags         cloud.InstanceTags
+}
+
+func (inst *lameInstance) ID() cloud.InstanceID {
+	return inst.id
+}
+
+func (inst *lameInstance) String() string {
+	return fmt.Sprint(inst.id)
+}
+
+func (inst *lameInstance) ProviderType() string {
+	return inst.providerType
+}
+
+func (inst *lameInstance) Address() string {
+	return "0.0.0.0:1234"
+}
+
+func (inst *lameInstance) SetTags(tags cloud.InstanceTags) error {
+	inst.p.mtx.Lock()
+	defer inst.p.mtx.Unlock()
+	inst.tags = cloud.InstanceTags{}
+	for k, v := range tags {
+		inst.tags[k] = v
+	}
+	return nil
+}
+
+func (inst *lameInstance) Destroy() error {
+	if inst.p.Hold != nil {
+		inst.p.Hold <- true
+	}
+	inst.p.mtx.Lock()
+	defer inst.p.mtx.Unlock()
+	delete(inst.p.instances, inst)
+	return nil
+}
+
+func (inst *lameInstance) Tags() cloud.InstanceTags {
+	return inst.tags
+}
+
+func (inst *lameInstance) VerifyPublicKey(ssh.PublicKey, *ssh.Client) error {
+	return nil
+}
diff --git a/lib/dispatchcloud/worker/gocheck_test.go b/lib/dispatchcloud/worker/gocheck_test.go
new file mode 100644
index 000000000..b4ca66c97
--- /dev/null
+++ b/lib/dispatchcloud/worker/gocheck_test.go
@@ -0,0 +1,16 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package worker
+
+import (
+	"testing"
+
+	check "gopkg.in/check.v1"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+	check.TestingT(t)
+}
diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go
new file mode 100644
index 000000000..43412bf6e
--- /dev/null
+++ b/lib/dispatchcloud/worker/pool.go
@@ -0,0 +1,332 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package worker
+
+import (
+	"sync"
+	"time"
+
+	"git.curoverse.com/arvados.git/lib/cloud"
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+	"github.com/Sirupsen/logrus"
+)
+
+type Pool struct {
+	Cluster     *arvados.Cluster
+	Logger      logrus.FieldLogger
+	InstanceSet cloud.InstanceSet
+	ImageID     cloud.ImageID
+
+	subscribers map[<-chan struct{}]chan<- struct{}
+	creating    map[arvados.InstanceType]int // goroutines waiting for (InstanceSet)Create to return
+	workers     map[cloud.InstanceID]*worker
+	loaded      bool // loaded list of instances from InstanceSet at least once
+	stop        chan bool
+	mtx         sync.RWMutex
+	setupOnce   sync.Once
+}
+
+// Subscribe returns a channel that becomes ready whenever a worker's
+// state changes.
+//
+// Example:
+//
+//	ch := wp.Subscribe()
+//	for range ch {
+//		// some worker has become available; try scheduling some work
+//		if wantStop {
+//			wp.Unsubscribe(ch)
+//			break
+//		}
+//	}
+func (wp *Pool) Subscribe() <-chan struct{} {
+	wp.setupOnce.Do(wp.setup)
+	wp.mtx.Lock()
+	defer wp.mtx.Unlock()
+	ch := make(chan struct{}, 1)
+	wp.subscribers[ch] = ch
+	return ch
+}
+
+// Unsubscribe stops sending updates to the given channel.
+func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
+	wp.setupOnce.Do(wp.setup)
+	wp.mtx.Lock()
+	defer wp.mtx.Unlock()
+	delete(wp.subscribers, ch)
+}
+
+// Unallocated returns the number of unallocated (booting + idle +
+// unknown) instances of the given type.
+func (wp *Pool) Unallocated(it arvados.InstanceType) int {
+	wp.setupOnce.Do(wp.setup)
+	wp.mtx.RLock()
+	defer wp.mtx.RUnlock()
+	n := 0
+	for _, wkr := range wp.workers {
+		if wkr.instType == it && len(wkr.running) == 0 && (wkr.state == StateRunning || wkr.state == StateBooting || wkr.state == StateUnknown) {
+			n++
+		}
+	}
+	return n + wp.creating[it]
+}
+
+// Create a new instance with the given type, and add it to the worker
+// pool. The worker is added immediately; instance creation runs in
+// the background.
+func (wp *Pool) Create(it arvados.InstanceType) error {
+	wp.setupOnce.Do(wp.setup)
+	wp.mtx.Lock()
+	defer wp.mtx.Unlock()
+	tags := cloud.InstanceTags{"InstanceType": it.Name}
+	wp.creating[it]++
+	go func() {
+		inst, err := wp.InstanceSet.Create(it, wp.ImageID, tags, nil)
+		wp.mtx.Lock()
+		defer wp.mtx.Unlock()
+		wp.creating[it]--
+		if err != nil {
+			wp.Logger.Errorf("worker.Pool: create instance: %s", err)
+			go wp.notify()
+			return
+		}
+		wp.updateWorker(inst, it, StateBooting)
+	}()
+	return nil
+}
+
+func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, state State) {
+	id := inst.ID()
+	if wp.workers[id] != nil {
+		wp.workers[id].instance = inst
+		wp.workers[id].updated = time.Now()
+		return
+	}
+	wp.Logger.Debugf("worker.Pool: instance %q appeared with InstanceType %q -- adding with state %q", inst, it.Name, state)
+	wp.workers[id] = &worker{
+		logger:   wp.Logger,
+		state:    state,
+		instance: inst,
+		instType: it,
+		busy:     time.Now(),
+		updated:  time.Now(),
+	}
+	go wp.notify()
+}
+
+// Shutdown shuts down a worker with the given type, or returns false
+// if all workers with the given type are busy.
+func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
+	wp.setupOnce.Do(wp.setup)
+	wp.mtx.Lock()
+	defer wp.mtx.Unlock()
+	wp.Logger.Debugf("worker.Pool: Shutdown(%s)", it.Name)
+	for _, tryState := range []State{StateBooting, StateRunning} {
+		for _, wkr := range wp.workers {
+			if wkr.state != tryState || len(wkr.running) > 0 {
+				continue
+			}
+			if wkr.instType != it {
+				continue
+			}
+			go wkr.instance.Destroy()
+			go wp.notify()
+			wkr.state = StateShutdown
+			wkr.updated = time.Now()
+			return true
+		}
+	}
+	return false
+}
+
+// StartContainer starts a container on an idle worker immediately if
+// possible, otherwise returns false.
+func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
+	wp.setupOnce.Do(wp.setup)
+	wp.mtx.Lock()
+	defer wp.mtx.Unlock()
+	var wkr *worker
+	for _, w := range wp.workers {
+		if w.instType == it && w.state == StateRunning && len(w.running) == 0 && !w.starting {
+			if wkr == nil || w.busy.After(wkr.busy) {
+				wkr = w
+			}
+		}
+	}
+	if wkr == nil {
+		return false
+	}
+	wp.Logger.Debugf("worker.Pool: start container %s on instance %s", ctr.UUID, wkr.instance)
+	wkr.starting = true
+	wkr.updated = time.Now()
+	go func() {
+		_, stderr, err := wkr.execute("crunch-run '"+ctr.UUID+"' &", nil)
+		if err != nil {
+			wp.Logger.Errorf("worker.Pool: error starting container %s on instance %s: %s (stderr %q)", ctr.UUID, wkr.instance, err, stderr)
+		}
+		wp.mtx.Lock()
+		if err == nil {
+			wkr.running = append(wkr.running, ctr.UUID)
+			wkr.updated = time.Now()
+		}
+		wkr.starting = false
+		wp.mtx.Unlock()
+	}()
+	return true
+}
+
+// Stop synchronizing with the InstanceSet.
+func (wp *Pool) Stop() {
+	wp.setupOnce.Do(wp.setup)
+	close(wp.stop)
+}
+
+func (wp *Pool) setup() {
+	wp.creating = map[arvados.InstanceType]int{}
+	wp.workers = map[cloud.InstanceID]*worker{}
+	wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
+}
+
+func (wp *Pool) notify() {
+	wp.mtx.RLock()
+	defer wp.mtx.RUnlock()
+	for _, send := range wp.subscribers {
+		select {
+		case send <- struct{}{}:
+		default:
+		}
+	}
+}
+
+func (wp *Pool) SyncLoop() {
+	wp.setupOnce.Do(wp.setup)
+	wait := time.Nanosecond
+	timer := time.NewTimer(time.Second)
+	for {
+		if !timer.Stop() {
+			<-timer.C
+		}
+		timer.Reset(wait)
+		wp.Logger.Debugf("worker.Pool: wait %s", wait)
+		select {
+		case <-timer.C:
+		case <-wp.stop:
+			return
+		}
+		err := wp.Sync()
+		if err != nil {
+			wp.Logger.Warnf("worker.Pool: error getting instance list: %s", err)
+			wait = 15 * time.Second
+		} else {
+			wait = time.Minute
+		}
+	}
+}
+
+func (wp *Pool) Sync() error {
+	wp.setupOnce.Do(wp.setup)
+	wp.Logger.Debugf("worker.Pool: getting instance list")
+	threshold := time.Now()
+	instances, err := wp.InstanceSet.Instances(cloud.InstanceTags{})
+	if err != nil {
+		return err
+	}
+	wp.sync(threshold, instances)
+	return nil
+}
+
+func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
+	wp.mtx.Lock()
+	defer wp.mtx.Unlock()
+
+	for _, inst := range instances {
+		itTag := inst.Tags()["InstanceType"]
+		it, ok := wp.Cluster.InstanceTypes[itTag]
+		if !ok {
+			log := wp.Logger.Debugf
+			if wp.workers[inst.ID()] != nil {
+				log = wp.Logger.Errorf
+			}
+			log("worker.Pool: instance %q has unknown InstanceType tag %q --- ignoring", inst, itTag)
+			continue
+		}
+		wp.updateWorker(inst, it, StateUnknown)
+	}
+
+	for id, wkr := range wp.workers {
+		if wkr.updated.After(threshold) {
+			continue
+		}
+		wp.Logger.Infof("worker.Pool: instance %q disappeared, shutting down worker with state %q", wkr.instance, wkr.state)
+		wkr.state = StateShutdown
+		delete(wp.workers, id)
+		go wkr.instance.Destroy()
+		go wp.notify()
+	}
+
+	if !wp.loaded {
+		wp.loaded = true
+		wp.Logger.Infof("worker.Pool: loaded initial set of instances (%d) from InstanceSet", len(wp.workers))
+	}
+}
+
+// should be called in a new goroutine
+func (wp *Pool) probeAndUpdate(wkr *worker) {
+	wp.mtx.Lock()
+	updated := wkr.updated
+	booted := wkr.booted
+	wp.mtx.Unlock()
+
+	var err error
+	if !booted {
+		err = wkr.probeBooted(wp.Cluster.CloudVMs.BootProbeCommand)
+		if err == nil {
+			booted = true
+			wp.Logger.Infof("worker.Pool: instance %q booted", wkr.instance)
+		}
+	}
+	var running []string
+	if err == nil && booted {
+		running, err = wkr.probeRunning()
+	}
+	wp.mtx.Lock()
+	defer wp.mtx.Unlock()
+	defer func() { wkr.updated = time.Now() }()
+	wkr.booted = booted
+	if err != nil {
+		if wkr.state != StateShutdown {
+			elapsed := time.Since(wkr.probed)
+			wp.Logger.Infof("worker.Pool: instance %q not responding for %s: %s", wkr.instance, elapsed, err)
+
+			label, threshold := "", maxPingFailTime
+			if wkr.state == StateBooting {
+				label, threshold = "new ", maxBootTime
+			}
+			if elapsed > threshold {
+				wp.Logger.Warnf("worker.Pool: %sinstance %q unresponsive since %s; shutting down", label, wkr.instance, wkr.probed)
+				wkr.state = StateShutdown
+				go wkr.instance.Destroy()
+				go wp.notify()
+			}
+		}
+		return
+	}
+	wkr.probed = time.Now()
+	if len(running) > 0 {
+		wkr.busy = time.Now()
+	}
+	if wkr.state == StateShutdown {
+	} else if booted {
+		wkr.state = StateRunning
+	} else {
+		wkr.state = StateBooting
+	}
+	if updated == wkr.updated {
+		// We haven't started any new work since starting the
+		// probe, so this is the latest available information.
+		wkr.running = running
+	}
+	go wp.notify()
+}
diff --git a/lib/dispatchcloud/worker/pool_test.go b/lib/dispatchcloud/worker/pool_test.go
new file mode 100644
index 000000000..4d552aa66
--- /dev/null
+++ b/lib/dispatchcloud/worker/pool_test.go
@@ -0,0 +1,108 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package worker
+
+import (
+	"time"
+
+	"git.curoverse.com/arvados.git/lib/dispatchcloud/test"
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+	"github.com/Sirupsen/logrus"
+	check "gopkg.in/check.v1"
+)
+
+const GiB arvados.ByteSize = 1 << 30
+
+var _ = check.Suite(&PoolSuite{})
+
+type PoolSuite struct{}
+
+func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
+	logrus.StandardLogger().SetLevel(logrus.DebugLevel)
+	provider := &test.LameInstanceSet{Hold: make(chan bool)}
+	type1 := arvados.InstanceType{Name: "a1s", ProviderType: "a1.small", VCPUs: 1, RAM: 1 * GiB, Price: .01}
+	type2 := arvados.InstanceType{Name: "a2m", ProviderType: "a2.medium", VCPUs: 2, RAM: 2 * GiB, Price: .01}
+	pool := &Pool{
+		Cluster: &arvados.Cluster{
+			InstanceTypes: arvados.InstanceTypeMap{
+				type1.Name: type1,
+				type2.Name: type2,
+			},
+			CloudVMs: arvados.CloudVMs{
+				BootProbeCommand: "true",
+			},
+		},
+		Logger:      logrus.StandardLogger(),
+		InstanceSet: provider,
+	}
+	notify := pool.Subscribe()
+	defer pool.Unsubscribe(notify)
+	notify2 := pool.Subscribe()
+	defer pool.Unsubscribe(notify2)
+
+	c.Check(pool.Unallocated(type1), check.Equals, 0)
+	c.Check(pool.Unallocated(type2), check.Equals, 0)
+	pool.Create(type2)
+	pool.Create(type1)
+	pool.Create(type2)
+	c.Check(pool.Unallocated(type1), check.Equals, 1)
+	c.Check(pool.Unallocated(type2), check.Equals, 2)
+	// Unblock the pending Create calls and (before calling Sync!)
+	// wait for the pool to process the returned instances.
+	go provider.Release(3)
+	suite.wait(c, pool, notify, func() bool {
+		list, err := provider.Instances(nil)
+		return err == nil && len(list) == 3
+	})
+
+	c.Check(pool.Unallocated(type1), check.Equals, 1)
+	c.Check(pool.Unallocated(type2), check.Equals, 2)
+	pool.Sync()
+	c.Check(pool.Unallocated(type1), check.Equals, 1)
+	c.Check(pool.Unallocated(type2), check.Equals, 2)
+
+	c.Check(pool.Shutdown(type2), check.Equals, true)
+	suite.wait(c, pool, notify, func() bool {
+		return pool.Unallocated(type1) == 1 && pool.Unallocated(type2) == 1
+	})
+	c.Check(pool.Shutdown(type2), check.Equals, true)
+	suite.wait(c, pool, notify, func() bool {
+		return pool.Unallocated(type1) == 1 && pool.Unallocated(type2) == 0
+	})
+	c.Check(pool.Shutdown(type2), check.Equals, false)
+	for {
+		// Consume any waiting notifications to ensure the
+		// next one we get is from Shutdown.
+		select {
+		case <-notify:
+			continue
+		default:
+		}
+		break
+	}
+	c.Check(pool.Shutdown(type1), check.Equals, true)
+	suite.wait(c, pool, notify, func() bool {
+		return pool.Unallocated(type1) == 0 && pool.Unallocated(type2) == 0
+	})
+	select {
+	case <-notify2:
+	case <-time.After(time.Second):
+		c.Error("notify did not receive")
+	}
+	go provider.Release(3) // unblock Destroy calls
+}
+
+func (suite *PoolSuite) wait(c *check.C, pool *Pool, notify <-chan struct{}, ready func() bool) {
+	timeout := time.NewTimer(time.Second).C
+	for !ready() {
+		select {
+		case <-notify:
+			continue
+		case <-timeout:
+		}
+		break
+	}
+	c.Check(ready(), check.Equals, true)
+}
diff --git a/lib/dispatchcloud/worker/worker.go b/lib/dispatchcloud/worker/worker.go
new file mode 100644
index 000000000..7f3e20e2b
--- /dev/null
+++ b/lib/dispatchcloud/worker/worker.go
@@ -0,0 +1,178 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package worker
+
+import (
+	"bytes"
+	"errors"
+	"io"
+	"net"
+	"strings"
+	"sync"
+	"time"
+
+	"git.curoverse.com/arvados.git/lib/cloud"
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+	"github.com/Sirupsen/logrus"
+	"golang.org/x/crypto/ssh"
+)
+
+type State int
+
+const (
+	StateUnknown  State = iota // might be running a container already
+	StateBooting               // instance is booting
+	StateRunning               // instance is running
+	StateShutdown              // worker has stopped monitoring the instance
+
+	// TODO: configurable
+	maxPingFailTime = 10 * time.Minute
+	maxBootTime     = 20 * time.Minute
+)
+
+var stateString = map[State]string{
+	StateUnknown:  "unknown",
+	StateBooting:  "booting",
+	StateRunning:  "running",
+	StateShutdown: "shutdown",
+}
+
+// String implements fmt.Stringer.
+func (s State) String() string {
+	return stateString[s]
+}
+
+type worker struct {
+	logger   logrus.FieldLogger
+	state    State
+	instance cloud.Instance
+	instType arvados.InstanceType
+	booted   bool
+	probed   time.Time
+	updated  time.Time
+	busy     time.Time
+	running  []string
+	starting bool
+
+	client      *ssh.Client
+	clientErr   error
+	clientOnce  sync.Once
+	clientSetup chan bool
+	publicKey   ssh.PublicKey
+}
+
+// Create a new SSH session. If session setup fails or the SSH client
+// hasn't been setup yet, setup a new SSH client and try again.
+func (wkr *worker) newSession() (*ssh.Session, error) {
+	try := func(create bool) (*ssh.Session, error) {
+		client, err := wkr.sshClient(create)
+		if err != nil {
+			return nil, err
+		}
+		return client.NewSession()
+	}
+	session, err := try(false)
+	if err != nil {
+		session, err = try(true)
+	}
+	return session, err
+}
+
+// Get the latest SSH client. If another goroutine is in the process
+// of setting one up, wait for it to finish and return its result (or
+// the last successfully setup client, if it fails).
+func (wkr *worker) sshClient(create bool) (*ssh.Client, error) {
+	wkr.clientOnce.Do(func() {
+		wkr.clientSetup = make(chan bool, 1)
+		wkr.clientErr = errors.New("client not yet created")
+	})
+	defer func() { <-wkr.clientSetup }()
+	select {
+	case wkr.clientSetup <- true:
+		if create {
+			client, err := wkr.setupSSHClient()
+			if err == nil || wkr.client == nil {
+				wkr.client, wkr.clientErr = client, err
+			}
+			if err != nil {
+				return nil, err
+			}
+		}
+	default:
+		// Another goroutine is doing the above case.  Wait
+		// for it to finish and return whatever it leaves in
+		// wkr.client.
+		wkr.clientSetup <- true
+	}
+	return wkr.client, wkr.clientErr
+}
+
+// Create a new SSH client.
+func (wkr *worker) setupSSHClient() (*ssh.Client, error) {
+	addr := wkr.instance.Address()
+	if addr == "" {
+		return nil, errors.New("instance has no address")
+	}
+	var receivedKey ssh.PublicKey
+	client, err := ssh.Dial("tcp", addr, &ssh.ClientConfig{
+		User: "root",
+		Auth: []ssh.AuthMethod{
+			ssh.Password("1234"),
+		},
+		HostKeyCallback: func(hostname string, remote net.Addr, key ssh.PublicKey) error {
+			receivedKey = key
+			return nil
+		},
+		Timeout: time.Minute,
+	})
+	if err != nil {
+		return nil, err
+	} else if receivedKey == nil {
+		return nil, errors.New("BUG: key was never provided to HostKeyCallback")
+	}
+
+	if wkr.publicKey == nil || !bytes.Equal(wkr.publicKey.Marshal(), receivedKey.Marshal()) {
+		err = wkr.instance.VerifyPublicKey(receivedKey, client)
+		if err != nil {
+			return nil, err
+		}
+		wkr.publicKey = receivedKey
+	}
+	return client, nil
+}
+
+func (wkr *worker) execute(cmd string, stdin io.Reader) ([]byte, []byte, error) {
+	session, err := wkr.newSession()
+	if err != nil {
+		return nil, nil, err
+	}
+	defer session.Close()
+	var stdout, stderr bytes.Buffer
+	session.Stdout = &stdout
+	session.Stderr = &stderr
+	if stdin != nil {
+		session.Stdin = stdin
+	}
+	wkr.logger.Debugf("worker: instance %s execute %q", wkr.instance, cmd)
+	err = session.Run(cmd)
+	return stdout.Bytes(), stderr.Bytes(), err
+}
+
+func (wkr *worker) probeRunning() (running []string, err error) {
+	stdout, _, err := wkr.execute("crunch-run --probe", nil)
+	if err != nil {
+		return
+	}
+	running = strings.Split(string(bytes.TrimRight(stdout, "\n")), "\n")
+	return
+}
+
+func (wkr *worker) probeBooted(cmd string) error {
+	if cmd == "" {
+		cmd = "true"
+	}
+	_, _, err := wkr.execute(cmd, nil)
+	return err
+}
diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go
index 6edd18418..e5b4dead1 100644
--- a/sdk/go/arvados/config.go
+++ b/sdk/go/arvados/config.go
@@ -55,6 +55,7 @@ type Cluster struct {
 	ManagementToken    string
 	NodeProfiles       map[string]NodeProfile
 	InstanceTypes      InstanceTypeMap
+	CloudVMs           CloudVMs
 	HTTPRequestTimeout Duration
 	RemoteClusters     map[string]RemoteCluster
 	PostgreSQL         PostgreSQL
@@ -89,6 +90,16 @@ type InstanceType struct {
 	Preemptible  bool
 }
 
+type CloudVMs struct {
+	// Shell command that exits zero IFF the VM is fully booted
+	// and ready to run containers, e.g., "mount | grep
+	// /encrypted-tmp"
+	BootProbeCommand string
+
+	Driver           string
+	DriverParameters map[string]interface{}
+}
+
 type InstanceTypeMap map[string]InstanceType
 
 var errDuplicateInstanceTypeName = errors.New("duplicate instance type name")
diff --git a/sdk/go/arvados/container.go b/sdk/go/arvados/container.go
index 210ed9981..ef3547048 100644
--- a/sdk/go/arvados/container.go
+++ b/sdk/go/arvados/container.go
@@ -18,7 +18,7 @@ type Container struct {
 	Mounts               map[string]Mount     `json:"mounts"`
 	Output               string               `json:"output"`
 	OutputPath           string               `json:"output_path"`
-	Priority             int                  `json:"priority"`
+	Priority             int64                `json:"priority"`
 	RuntimeConstraints   RuntimeConstraints   `json:"runtime_constraints"`
 	State                ContainerState       `json:"state"`
 	SchedulingParameters SchedulingParameters `json:"scheduling_parameters"`

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list