[ARVADOS] updated: 1.2.0-9-g587678dee
Git user
git at public.curoverse.com
Fri Aug 24 15:35:52 EDT 2018
Summary of changes:
build/run-tests.sh | 1 +
lib/cloud/interfaces.go | 60 +++-
lib/dispatchcloud/test/lame_provider.go | 118 ++++++++
lib/dispatchcloud/{ => worker}/gocheck_test.go | 2 +-
lib/dispatchcloud/worker/pool.go | 379 ++++++++-----------------
lib/dispatchcloud/worker/pool_test.go | 101 +++++--
lib/dispatchcloud/worker/worker.go | 178 ++++++++++++
sdk/go/arvados/config.go | 11 +
8 files changed, 560 insertions(+), 290 deletions(-)
create mode 100644 lib/dispatchcloud/test/lame_provider.go
copy lib/dispatchcloud/{ => worker}/gocheck_test.go (90%)
create mode 100644 lib/dispatchcloud/worker/worker.go
discards 7fef859a22f5c27a4e5e08a9f7c9559dc5daca3d (commit)
via 587678dee4c33ad07db18897b71b732567e56661 (commit)
This update added new revisions after undoing existing revisions. That is
to say, the old revision is not a strict subset of the new revision. This
situation occurs when you --force push a change and generate a repository
containing something like this:
* -- * -- B -- O -- O -- O (7fef859a22f5c27a4e5e08a9f7c9559dc5daca3d)
\
N -- N -- N (587678dee4c33ad07db18897b71b732567e56661)
When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 587678dee4c33ad07db18897b71b732567e56661
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Fri Aug 24 15:35:38 2018 -0400
13964: sketch
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/build/run-build-packages.sh b/build/run-build-packages.sh
index caebac013..11588913e 100755
--- a/build/run-build-packages.sh
+++ b/build/run-build-packages.sh
@@ -295,6 +295,8 @@ package_go_binary cmd/arvados-server arvados-server \
"Arvados server daemons"
package_go_binary cmd/arvados-server arvados-controller \
"Arvados cluster controller daemon"
+package_go_binary cmd/arvados-server arvados-dispatch-cloud \
+ "Arvados cluster cloud dispatch"
package_go_binary sdk/go/crunchrunner crunchrunner \
"Crunchrunner executes a command inside a container and uploads the output"
package_go_binary services/arv-git-httpd arvados-git-httpd \
diff --git a/build/run-tests.sh b/build/run-tests.sh
index e669e326c..2ade688af 100755
--- a/build/run-tests.sh
+++ b/build/run-tests.sh
@@ -77,6 +77,7 @@ lib/cmd
lib/controller
lib/crunchstat
lib/dispatchcloud
+lib/dispatchcloud/worker
services/api
services/arv-git-httpd
services/crunchstat
@@ -917,6 +918,7 @@ gostuff=(
lib/controller
lib/crunchstat
lib/dispatchcloud
+ lib/dispatchcloud/worker
sdk/go/arvados
sdk/go/arvadosclient
sdk/go/blockdigest
diff --git a/cmd/arvados-server/arvados-dispatch-cloud.service b/cmd/arvados-server/arvados-dispatch-cloud.service
new file mode 100644
index 000000000..5ea5d45e7
--- /dev/null
+++ b/cmd/arvados-server/arvados-dispatch-cloud.service
@@ -0,0 +1,28 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+[Unit]
+Description=Arvados cloud dispatch
+Documentation=https://doc.arvados.org/
+After=network.target
+AssertPathExists=/etc/arvados/config.yml
+
+# systemd==229 (ubuntu:xenial) obeys StartLimitInterval in the [Unit] section
+StartLimitInterval=0
+
+# systemd>=230 (debian:9) obeys StartLimitIntervalSec in the [Unit] section
+StartLimitIntervalSec=0
+
+[Service]
+Type=notify
+EnvironmentFile=-/etc/arvados/environment
+ExecStart=/usr/bin/arvados-dispatch-cloud
+Restart=always
+RestartSec=1
+
+# systemd<=219 (centos:7, debian:8, ubuntu:trusty) obeys StartLimitInterval in the [Service] section
+StartLimitInterval=0
+
+[Install]
+WantedBy=multi-user.target
diff --git a/cmd/arvados-server/cmd.go b/cmd/arvados-server/cmd.go
index 1af3745df..cd15d25dd 100644
--- a/cmd/arvados-server/cmd.go
+++ b/cmd/arvados-server/cmd.go
@@ -9,6 +9,7 @@ import (
"git.curoverse.com/arvados.git/lib/cmd"
"git.curoverse.com/arvados.git/lib/controller"
+ "git.curoverse.com/arvados.git/lib/dispatchcloud"
)
var (
@@ -18,7 +19,8 @@ var (
"-version": cmd.Version(version),
"--version": cmd.Version(version),
- "controller": controller.Command,
+ "controller": controller.Command,
+ "dispatch-cloud": dispatchcloud.Command,
})
)
diff --git a/lib/cloud/interfaces.go b/lib/cloud/interfaces.go
new file mode 100644
index 000000000..0a2ec47bd
--- /dev/null
+++ b/lib/cloud/interfaces.go
@@ -0,0 +1,140 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package cloud
+
+import (
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "golang.org/x/crypto/ssh"
+)
+
+// A RateLimitError should be returned by an InstanceSet when the
+// cloud service indicates it is rejecting all API calls for some time
+// interval.
+type RateLimitError interface {
+ // Time before which the caller should expect requests to
+ // fail.
+ EarliestRetry() time.Time
+ error
+}
+
+// A QuotaError should be returned by an InstanceSet when the cloud
+// service indicates the account cannot create more VMs than already
+// exist.
+type QuotaError interface {
+ // If true, don't create more instances until some existing
+ // instances are destroyed. If false, don't handle the error
+ // as a quota error.
+ IsQuotaError() bool
+ error
+}
+
+type InstanceSetID string
+type InstanceTags map[string]string
+type InstanceID string
+type ImageID string
+
+// Instance is implemented by the provider-specific instance types.
+type Instance interface {
+ // ID returns the provider's instance ID. It must be stable
+ // for the life of the instance.
+ ID() InstanceID
+
+ // String typically returns the cloud-provided instance ID.
+ String() string
+
+ // Cloud provider's "instance type" ID. Matches a ProviderType
+ // in the cluster's InstanceTypes configuration.
+ ProviderType() string
+
+ // Get current tags
+ Tags() InstanceTags
+
+ // Replace tags with the given tags
+ SetTags(InstanceTags) error
+
+ // Shut down the node
+ Destroy() error
+
+ // SSH server hostname or IP address, or empty string if
+ // unknown while instance is booting.
+ Address() string
+
+ // Return nil if the given public key matches the instance's
+ // SSH server key. If the provided Dialer is not nil,
+ // VerifyPublicKey can use it to make outgoing network
+ // connections from the instance -- e.g., to use the cloud's
+ // "this instance's metadata" API.
+ VerifyPublicKey(ssh.PublicKey, *ssh.Client) error
+}
+
+// An InstanceSet manages a set of VM instances created by an elastic
+// cloud provider like AWS, GCE, or Azure.
+//
+// All public methods of an InstanceSet, and all public methods of the
+// instances it returns, are goroutine safe.
+type InstanceSet interface {
+ // Create a new instance. If supported by the driver, add the
+ // provided public key to /root/.ssh/authorized_keys.
+ //
+ // The returned error should implement RateLimitError and
+ // QuotaError where applicable.
+ Create(arvados.InstanceType, ImageID, InstanceTags, ssh.PublicKey) (Instance, error)
+
+ // Return all instances, including ones that are booting or
+ // shutting down. Optionally, filter out nodes that don't have
+ // all of the given InstanceTags (the caller will ignore these
+ // anyway).
+ //
+ // An instance returned by successive calls to Instances() may
+ // -- but does not need to -- be represented by the same
+ // Instance object each time. Thus, the caller is responsible
+ // for de-duplicating the returned instances by comparing the
+ // InstanceIDs returned by the instances' ID() methods.
+ Instances(InstanceTags) ([]Instance, error)
+
+ // Stop any background tasks and release other resources.
+ Stop()
+}
+
+// A Driver returns an InstanceSet that uses the given
+// driver-dependent configuration parameters.
+//
+// The supplied id will be of the form "zzzzz-zzzzz-zzzzzzzzzzzzzzz"
+// where each z can be any alphanum. The returned InstanceSet must use
+// this id to tag long-lived cloud resources that it creates, and must
+// assume control of any existing resources that are tagged with the
+// same id. Tagging can be accomplished by including the ID in
+// resource names, using the cloud provider's tagging feature, or any
+// other mechanism. The tags must be visible to another instance of
+// the same driver running on a different host.
+//
+// The returned InstanceSet must ignore existing resources that are
+// visible but not tagged with the given id, except that it should log
+// a summary of such resources -- only once -- when it starts
+// up. Thus, two identically configured providers running on different
+// hosts with different ids should log about the existence of each
+// other's resources at startup, but will not interfere with each
+// other.
+//
+// Example:
+//
+// type provider struct {
+// ownID string
+// AccessKey string
+// }
+//
+// func ExampleDriver(config map[string]interface{}, id InstanceSetID) (InstanceSet, error) {
+// var p provider
+// if err := mapstructure.Decode(config, &p); err != nil {
+// return nil, err
+// }
+// p.ownID = id
+// return &p, nil
+// }
+//
+// var _ = registerCloudDriver("example", ExampleDriver)
+type Driver func(config map[string]interface{}, id InstanceSetID) (InstanceSet, error)
diff --git a/lib/dispatchcloud/cmd.go b/lib/dispatchcloud/cmd.go
new file mode 100644
index 000000000..b2bc91300
--- /dev/null
+++ b/lib/dispatchcloud/cmd.go
@@ -0,0 +1,17 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+ "git.curoverse.com/arvados.git/lib/cmd"
+ "git.curoverse.com/arvados.git/lib/service"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+var Command cmd.Handler = service.Command(arvados.ServiceNameDispatchCloud, newHandler)
+
+func newHandler(cluster *arvados.Cluster, np *arvados.NodeProfile) service.Handler {
+ return &dispatcher{Cluster: cluster, NodeProfile: np}
+}
diff --git a/lib/dispatchcloud/container_queue.go b/lib/dispatchcloud/container_queue.go
new file mode 100644
index 000000000..f8c0512b6
--- /dev/null
+++ b/lib/dispatchcloud/container_queue.go
@@ -0,0 +1,201 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+ "sync"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+type containerQueue struct {
+ Logger logger
+ Cluster *arvados.Cluster
+ Client *arvados.Client
+
+ current map[string]*arvados.Container
+ mtx sync.Mutex
+ keeplocal map[string]struct{}
+}
+
+func (cq *containerQueue) Forget(uuid string) {
+ cq.mtx.Lock()
+ defer cq.mtx.Unlock()
+ ctr := cq.current[uuid]
+ if ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled {
+ delete(cq.current, uuid)
+ }
+}
+
+func (cq *containerQueue) Get(uuid) (arvados.Container, bool) {
+ cq.mtx.Lock()
+ defer cq.mtx.Unlock()
+ if ctr, ok := cq.current[uuid]; !ok {
+ return arvados.Container{}, false
+ } else {
+ return *ctr
+ }
+}
+
+func (cq *containerQueue) All() map[string]arvados.Container {
+ cq.mtx.Lock()
+ defer cq.mtx.Unlock()
+ ret := make(map[string]*arvados.Container, len(cq.current))
+ for uuid, ctr := range cq.current {
+ ret[uuid] = *ctr
+ }
+ return ret
+}
+
+func (cq *containerQueue) setup() {
+ cq.current.Store(map[string]*arvados.Container{})
+}
+
+func (cq *containerQueue) update() error {
+ cq.mtx.Lock()
+ cq.keeplocal = map[string]struct{}{}
+ cq.mtx.Unlock()
+
+ defer func() { cq.mtx.Lock(); cq.keeplocal = nil; cq.mtx.Unlock() }()
+
+ next, err := cq.poll()
+ if err != nil {
+ return err
+ }
+ cq.mtx.Lock()
+ defer cq.mtx.Unlock()
+ for uuid := range cq.keeplocal {
+ next[uuid] = cq.current[uuid]
+ }
+ cq.current = next
+}
+
+func (cq *containerQueue) Lock(uuid string) error {
+ return cq.apiUpdate(uuid, "lock")
+}
+
+func (cq *containerQueue) Unlock(uuid string) error {
+ return cq.apiUpdate(uuid, "unlock")
+}
+
+func (cq *containerQueue) apiUpdate(uuid, action string) error {
+ var resp arvados.Container
+ err := cq.Client.RequestAndDecode(&resp, "POST", "arvados/v1/containers/"+uuid+"/"+action, nil, nil)
+ if err != nil {
+ return err
+ }
+
+ cq.mtx.Lock()
+ defer cq.mtx.Unlock()
+ if cq.keeplocal != nil {
+ cq.keeplocal[uuid] = struct{}{}
+ }
+ if ctr, ok := cq.current[uuid]; !ok {
+ cq.current[uuid] = &resp
+ } else {
+ ctr.State, ctr.Priority, ctr.LockedByUUID = resp.State, resp.Priority, resp.LockedByUUID
+ }
+ return nil
+}
+
+func (cq *containerQueue) poll() (map[string]*arvados.Container, error) {
+ cq.mtx.Lock()
+ size := len(cq.current)
+ cq.mtx.Unlock()
+
+ next := make(map[string]*arvados.Container, size)
+ apply := func(updates []arvados.Container) {
+ for _, upd := range updates {
+ if next[ctr.UUID] == nil {
+ next[ctr.UUID] = &arvados.Container{}
+ }
+ *next[ctr.UUID] = upd
+ }
+ }
+ selectParam := []string{"uuid", "state", "priority"}
+ limitParam := 1000
+
+ mine, err := cq.fetchAll(arvados.ResourceListParams{
+ Select: selectParam,
+ Order: []string{"uuid"},
+ Limit: &limitParam,
+ Count: "none",
+ Filters: {{"locked_by_uuid", "=", cq.authUUID}},
+ })
+ if err != nil {
+ return nil, err
+ }
+ apply(mine)
+
+ avail, err := cq.fetchAll(arvados.ResourceListParams{
+ Select: selectParam,
+ Order: []string{"uuid"},
+ Limit: &limitParam,
+ Count: "none",
+ Filters: {{"state", "=", Queued}, {"priority", ">", "0"}},
+ })
+ if err != nil {
+ return err
+ }
+ apply(avail)
+
+ var missing []string
+ cq.mtx.Lock()
+ for uuid, ctr := range cq.current {
+ if next[uuid] == nil &&
+ ctr.State != arvados.ContainerStateCancelled &&
+ ctr.state != arvados.ContainerStateComplete {
+ missing = append(missing, uuid)
+ }
+ }
+ cq.mtx.Unlock()
+
+ for i, page := 0, 20; i < len(missing); i += page {
+ batch := missing[i:]
+ if len(batch) > page {
+ batch = batch[:page]
+ }
+ ended, err := cq.fetchAll(arvados.ResourceListParams{
+ Select: selectParam,
+ Order: []string{"uuid"},
+ Count: "none",
+ Filters: {{"uuid", "in", batch}},
+ })
+ if err != nil {
+ return err
+ }
+ apply(ended)
+ }
+ return next, nil
+}
+
+func (cq *containerQueue) fetchAll(initialParams arvados.ResourceListParams) ([]arvados.Container, error) {
+ var results []arvados.Container
+ params := initialParams
+ params.Offset = 0
+ for {
+ // This list variable must be a new one declared
+ // inside the loop: otherwise, items in the API
+ // response would get deep-merged into the items
+ // loaded in previous iterations.
+ var list arvados.ContainerList
+
+ err := cq.Client.RequestAndDecode(&list, "GET", "arvados/v1/containers", nil, params)
+ if err != nil {
+ return nil, err
+ }
+ if len(list.Items) == 0 {
+ break
+ }
+
+ results = append(results, list.Items)
+ if len(params.Order) == 1 && params.Order[0] == "uuid" {
+ params.Filters = append(initialParams.Filters, []interface{}{"uuid", ">", list.Items[len(list.Items)-1].UUID})
+ } else {
+ params.Offset += len(list.Items)
+ }
+ }
+ return results, nil
+}
diff --git a/lib/dispatchcloud/dispatcher.go b/lib/dispatchcloud/dispatcher.go
new file mode 100644
index 000000000..90e2c799f
--- /dev/null
+++ b/lib/dispatchcloud/dispatcher.go
@@ -0,0 +1,61 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+ "net/http"
+
+ "git.curoverse.com/arvados.git/lib/cloud"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "github.com/Sirupsen/logrus"
+)
+
+type dispatcher struct {
+ Cluster *arvados.Cluster
+ NodeProfile *arvados.NodeProfile
+
+ logger logger
+ provider cloud.Provider
+ workerPool workerPool
+ queue containerQueue
+ scheduler scheduler
+ syncer syncer
+ staleLockFixer staleLockFixer
+ httpHandler http.Handler
+}
+
+// ServeHTTP implements service.Handler.
+func (disp *dispatcher) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ disp.setupOnce.Do(disp.setup)
+ disp.handler.ServeHTTP(w, r)
+}
+
+// CheckHealth implements service.Handler.
+func (disp *dispatcher) CheckHealth() error {
+ disp.setupOnce.Do(disp.setup)
+ return nil
+}
+
+func (disp *dispatcher) setup() {
+ disp.logger = logrus.StandardLogger()
+ disp.provider = &providerProxy{disp.logger, newProvider(disp.Cluster)}
+ disp.workerPool = &workerPool{disp.logger, disp.provider}
+ disp.queue = &containerQueue{disp.logger, disp.Cluster}
+ disp.scheduler = &scheduler{disp.logger, disp.queue, disp.workerPool}
+ disp.syncer = &syncer{disp.logger, disp.queue, disp.workerPool}
+ disp.staleLockFixer = &staleLockFixer{disp.logger, disp.queue, disp.workerPool}
+
+ go func() {
+ disp.workerPool.Start()
+ // staleLockFixer must be ready before scheduler can start
+ disp.staleLockFixer.Wait()
+ go disp.scheduler.Run()
+ go disp.syncer.Run()
+ }()
+
+ mux := http.NewServeMux()
+ mux.Handle("/status.json", disp.serveStatusJSON)
+ disp.httpHandler = mux
+}
diff --git a/lib/dispatchcloud/logger.go b/lib/dispatchcloud/logger.go
new file mode 100644
index 000000000..90bb6ca68
--- /dev/null
+++ b/lib/dispatchcloud/logger.go
@@ -0,0 +1,29 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+ "sync"
+ "time"
+)
+
+type logger interface {
+ Printf(string, ...interface{})
+ Warnf(string, ...interface{})
+ Debugf(string, ...interface{})
+}
+
+var nextSpam = map[string]time.Time{}
+var nextSpamMtx sync.Mutex
+
+func unspam(msg string) bool {
+ nextSpamMtx.Lock()
+ defer nextSpamMtx.Unlock()
+ if nextSpam[msg].Before(time.Now()) {
+ nextSpam[msg] = time.Now().Add(time.Minute)
+ return true
+ }
+ return false
+}
diff --git a/lib/dispatchcloud/readme.go b/lib/dispatchcloud/readme.go
new file mode 100644
index 000000000..a4b005eb8
--- /dev/null
+++ b/lib/dispatchcloud/readme.go
@@ -0,0 +1,79 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+// A dispatcher comprises a container queue, a scheduler, a worker
+// pool, a cloud provider, a stale-lock fixer, and a syncer.
+// 1. Choose a provider.
+// 2. Start a worker pool.
+// 3. Start a container queue.
+// 4. Run a stale-lock fixer.
+// 5. Start a scheduler.
+// 6. Start a syncer.
+//
+//
+// A provider (cloud driver) creates new cloud VM instances and gets
+// the latest list of instances. The returned instances implement
+// proxies to the provider's metadata and control interfaces (get IP
+// address, update tags, shutdown).
+//
+//
+// A workerPool tracks workers' instance types and readiness states
+// (available to do work now, booting, suffering a temporary network
+// outage, shutting down). It loads internal state from the cloud
+// provider's list of instances at startup, and syncs periodically
+// after that.
+//
+//
+// A worker maintains a multiplexed SSH connection to a cloud
+// instance, retrying/reconnecting as needed, so the workerPool can
+// execute commands. It asks the provider's instance to verify its SSH
+// public key once when first connecting, and again later if the key
+// changes.
+//
+//
+// A container queue tracks the known state (according to
+// arvados-controller) of each container of interest -- i.e., queued,
+// or locked/running using our own dispatch token. It also proxies the
+// dispatcher's lock/unlock/cancel requests to the controller. It
+// handles concurrent refresh and update operations without exposing
+// out-of-order updates to its callers. (It drops any new information
+// that might have originated before its own most recent
+// lock/unlock/cancel operation.)
+//
+//
+// A stale-lock fixer waits for any already-locked containers (i.e.,
+// locked by a prior server process) to appear on workers as the
+// worker pool recovers its state. It unlocks/requeues any that still
+// remain when all workers are recovered or shutdown, or its timer
+// expires.
+//
+//
+// A scheduler chooses which containers to assign to which idle
+// workers, and decides what to do when there are not enough idle
+// workers (including shutting down some idle nodes).
+//
+//
+// A syncer updates state to Cancelled when a running container
+// process dies without finalizing its entry in the controller
+// database. It also calls the worker pool to kill containers that
+// have priority=0 while locked or running.
+//
+//
+// A provider proxy wraps a provider with rate-limiting logic. After
+// the wrapped provider receives a cloud.RateLimitError, the proxy
+// starts returning errors to callers immediately without calling
+// through to the wrapped provider.
+//
+//
+// TBD: Bootstrapping script via SSH, too? Future version.
+//
+// TBD: drain instance, keep instance alive
+// TBD: metrics, diagnostics
+// TBD: why dispatch token currently passed to worker?
+//
+// Metrics: queue size, time job has been in queued, #idle/busy/booting nodes
+// Timing in each step, and end-to-end
+// Metrics: boot/idle/alloc time and cost
diff --git a/lib/dispatchcloud/scheduler.go b/lib/dispatchcloud/scheduler.go
new file mode 100644
index 000000000..a7dd3f244
--- /dev/null
+++ b/lib/dispatchcloud/scheduler.go
@@ -0,0 +1,155 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+ "sort"
+ "sync"
+ "time"
+
+ "git.curoverse.com/arvados.git/lib/cloud"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+type container struct {
+ container arvados.Container
+ wantsType arvados.InstanceType
+}
+
+func (c *container) String() string {
+ return c.container.UUID
+}
+
+// A scheduler assigns queued containers to available workers, and
+// creates new workers when there aren't enough available.
+//
+// If it encounters problems creating new workers, the scheduler also
+// shuts down idle workers in case they are consuming quota.
+// Otherwise, workers are responsible for shutting themselves down
+// after the configured idle time threshold.
+type scheduler struct {
+ logger
+ containerQueue
+ workerPool
+
+ queue []*container
+ queueMtx sync.Mutex
+ instances map[cloud.Instance]struct{}
+ enqueueOrUpdate chan container
+ runOnce sync.Once
+}
+
+func (sched *scheduler) try() {
+ ctrs := sched.containerQueue.Current()
+ queue := make([]arvados.Container, 0, len(ctrs))
+ for uuid, ctr := range ctrs {
+ queue = append(queue, ctr)
+ }
+ sort.Slice(queue, func(i, j int) bool {
+ queue[i].Priority > queue[j].Priority
+ })
+ dibs := map[arvados.InstanceType]int{}
+ for _, ctr := range queue {
+ switch {
+ case ctr.State == arvados.ContainerStateQueued && ctr.Priority > 0:
+ it, err := ChooseInstanceType(sched.Cluster, &ctr)
+ if err != nil {
+ sched.logger.Warnf("cannot run %s", &ctr)
+ continue
+ }
+
+ if dibs[it] >= sched.workerPool.Pending(it) {
+ err := sched.workerPool.Create(it)
+ if err != nil {
+ if unspam(err.Error()) {
+ sched.logger.Warnf("scheduler: workerPool.Create: %s", err)
+ }
+ return
+ }
+ }
+ dibs[it]++
+ // ...
+ case ctr.State == arvados.ContainerStateLocked || ctr.State == arvados.ContainerStateRunning:
+ // ...
+ }
+ }
+}
+
+func (sched *scheduler) setup() {
+ sched.enqueueOrUpdate = make(chan container, 1)
+ go sched.run()
+}
+
+func (sched *scheduler) run() {
+ wakeup := make(chan struct{}, 1)
+ workers := map[*worker]*container{}
+ ctrs := map[string]*container{} // key is container UUID
+ timer := time.NewTimer(time.Second)
+ queue := []*container{}
+ for {
+ select {
+ case ctr := <-sched.enqueueOrUpdate:
+ // Get a newly queued container, or update
+ // priority/state.
+ if ctrs[ctr.UUID] == nil {
+ ctrs[ctr.UUID] = &ctr
+ } else {
+ ctrs[ctr.UUID].container = ctr.container
+ continue
+ }
+ case <-timer.C:
+ case <-wakeup:
+ }
+
+ queue = queue[:0]
+ for _, ctr := range ctrs {
+ if ctr.State == arvados.ContainerStateLocked {
+ queue = append(queue, ctr)
+ }
+ }
+ sort.Slice(queue, func(i, j int) bool {
+ if d := queue[i].Priority - queue[j].Priority; d != 0 {
+ return d > 0
+ } else {
+ return queue[i].UUID > queue[j].UUID
+ }
+ })
+
+ // Dispatch highest priority container to the idle
+ // worker with the shortest idle time.
+ for len(queue) > 0 {
+ select {
+ case todo[queue[0].wantsType] <- queue[0]:
+ queue = queue[1:]
+ continue
+ default:
+ }
+ break
+ }
+
+ // Compare queue to booting.
+ }
+}
+
+// DispatchFunc returns a dispatch.DispatchFunc.
+func (sched *scheduler) DispatchFunc(cluster *arvados.Cluster) func(actr arvados.Container, update <-chan arvados.Container) {
+ go sched.runOnce.Do(sched.run)
+ return func(actr arvados.Container, update <-chan arvados.Container) {
+ it, err := ChooseInstanceType(cluster, &actr)
+ if err != nil {
+ return err
+ }
+ sched.enqueueOrUpdate <- container{
+ container: actr,
+ wantsType: it,
+ }
+ for actr := range update {
+ sched.enqueueOrUpdate <- container{
+ container: actr,
+ wantsType: it,
+ }
+ }
+ }
+}
diff --git a/lib/dispatchcloud/test/lame_provider.go b/lib/dispatchcloud/test/lame_provider.go
new file mode 100644
index 000000000..996a63821
--- /dev/null
+++ b/lib/dispatchcloud/test/lame_provider.go
@@ -0,0 +1,118 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package test
+
+import (
+ "fmt"
+ "math/rand"
+ "sync"
+
+ "git.curoverse.com/arvados.git/lib/cloud"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "golang.org/x/crypto/ssh"
+)
+
+// LameInstanceSet creates instances that boot but can't run
+// containers.
+type LameInstanceSet struct {
+ Hold chan bool // set to make(chan bool) to hold operations until Release is called
+
+ mtx sync.Mutex
+ instances map[*lameInstance]bool
+}
+
+// Create returns a new instance.
+func (p *LameInstanceSet) Create(instType arvados.InstanceType, imageID cloud.ImageID, tags cloud.InstanceTags, pubkey ssh.PublicKey) (cloud.Instance, error) {
+ inst := &lameInstance{
+ p: p,
+ id: cloud.InstanceID(fmt.Sprintf("lame-%x", rand.Uint64())),
+ providerType: instType.ProviderType,
+ }
+ inst.SetTags(tags)
+ if p.Hold != nil {
+ p.Hold <- true
+ }
+ p.mtx.Lock()
+ defer p.mtx.Unlock()
+ if p.instances == nil {
+ p.instances = map[*lameInstance]bool{}
+ }
+ p.instances[inst] = true
+ return inst, nil
+}
+
+// Instances returns the instances that haven't been destroyed.
+func (p *LameInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
+ p.mtx.Lock()
+ defer p.mtx.Unlock()
+ var instances []cloud.Instance
+ for i := range p.instances {
+ instances = append(instances, i)
+ }
+ return instances, nil
+}
+
+// Stop is a no-op, but exists to satisfy cloud.InstanceSet.
+func (p *LameInstanceSet) Stop() {
+}
+
+// Release n held calls. Blocks if n calls aren't already
+// waiting. Blocks forever if Hold is nil.
+func (p *LameInstanceSet) Release(n int) {
+ for i := 0; i < n; i++ {
+ <-p.Hold
+ }
+}
+
+type lameInstance struct {
+ p *LameInstanceSet
+ id cloud.InstanceID
+ providerType string
+ tags cloud.InstanceTags
+}
+
+func (inst *lameInstance) ID() cloud.InstanceID {
+ return inst.id
+}
+
+func (inst *lameInstance) String() string {
+ return fmt.Sprint(inst.id)
+}
+
+func (inst *lameInstance) ProviderType() string {
+ return inst.providerType
+}
+
+func (inst *lameInstance) Address() string {
+ return "0.0.0.0:1234"
+}
+
+func (inst *lameInstance) SetTags(tags cloud.InstanceTags) error {
+ inst.p.mtx.Lock()
+ defer inst.p.mtx.Unlock()
+ inst.tags = cloud.InstanceTags{}
+ for k, v := range tags {
+ inst.tags[k] = v
+ }
+ return nil
+}
+
+func (inst *lameInstance) Destroy() error {
+ if inst.p.Hold != nil {
+ inst.p.Hold <- true
+ }
+ inst.p.mtx.Lock()
+ defer inst.p.mtx.Unlock()
+ delete(inst.p.instances, inst)
+ return nil
+}
+
+func (inst *lameInstance) Tags() cloud.InstanceTags {
+ return inst.tags
+}
+
+func (inst *lameInstance) VerifyPublicKey(ssh.PublicKey, *ssh.Client) error {
+ return nil
+}
diff --git a/lib/dispatchcloud/worker/gocheck_test.go b/lib/dispatchcloud/worker/gocheck_test.go
new file mode 100644
index 000000000..b4ca66c97
--- /dev/null
+++ b/lib/dispatchcloud/worker/gocheck_test.go
@@ -0,0 +1,16 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package worker
+
+import (
+ "testing"
+
+ check "gopkg.in/check.v1"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+ check.TestingT(t)
+}
diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go
new file mode 100644
index 000000000..43412bf6e
--- /dev/null
+++ b/lib/dispatchcloud/worker/pool.go
@@ -0,0 +1,332 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package worker
+
+import (
+ "sync"
+ "time"
+
+ "git.curoverse.com/arvados.git/lib/cloud"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "github.com/Sirupsen/logrus"
+)
+
+type Pool struct {
+ Cluster *arvados.Cluster
+ Logger logrus.FieldLogger
+ InstanceSet cloud.InstanceSet
+ ImageID cloud.ImageID
+
+ subscribers map[<-chan struct{}]chan<- struct{}
+ creating map[arvados.InstanceType]int // goroutines waiting for (InstanceSet)Create to return
+ workers map[cloud.InstanceID]*worker
+ loaded bool // loaded list of instances from InstanceSet at least once
+ stop chan bool
+ mtx sync.RWMutex
+ setupOnce sync.Once
+}
+
+// Subscribe returns a channel that becomes ready whenever a worker's
+// state changes.
+//
+// Example:
+//
+// ch := wp.Subscribe()
+// for range ch {
+// // some worker has become available; try scheduling some work
+// if wantStop {
+// wp.Unsubscribe(ch)
+// break
+// }
+// }
+func (wp *Pool) Subscribe() <-chan struct{} {
+ wp.setupOnce.Do(wp.setup)
+ wp.mtx.Lock()
+ defer wp.mtx.Unlock()
+ ch := make(chan struct{}, 1)
+ wp.subscribers[ch] = ch
+ return ch
+}
+
+// Unsubscribe stops sending updates to the given channel.
+func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
+ wp.setupOnce.Do(wp.setup)
+ wp.mtx.Lock()
+ defer wp.mtx.Unlock()
+ delete(wp.subscribers, ch)
+}
+
+// Unallocated returns the number of unallocated (booting + idle +
+// unknown) instances of the given type.
+func (wp *Pool) Unallocated(it arvados.InstanceType) int {
+ wp.setupOnce.Do(wp.setup)
+ wp.mtx.RLock()
+ defer wp.mtx.RUnlock()
+ n := 0
+ for _, wkr := range wp.workers {
+ if wkr.instType == it && len(wkr.running) == 0 && (wkr.state == StateRunning || wkr.state == StateBooting || wkr.state == StateUnknown) {
+ n++
+ }
+ }
+ return n + wp.creating[it]
+}
+
+// Create a new instance with the given type, and add it to the worker
+// pool. The worker is added immediately; instance creation runs in
+// the background.
+func (wp *Pool) Create(it arvados.InstanceType) error {
+ wp.setupOnce.Do(wp.setup)
+ wp.mtx.Lock()
+ defer wp.mtx.Unlock()
+ tags := cloud.InstanceTags{"InstanceType": it.Name}
+ wp.creating[it]++
+ go func() {
+ inst, err := wp.InstanceSet.Create(it, wp.ImageID, tags, nil)
+ wp.mtx.Lock()
+ defer wp.mtx.Unlock()
+ wp.creating[it]--
+ if err != nil {
+ wp.Logger.Errorf("worker.Pool: create instance: %s", err)
+ go wp.notify()
+ return
+ }
+ wp.updateWorker(inst, it, StateBooting)
+ }()
+ return nil
+}
+
+func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, state State) {
+ id := inst.ID()
+ if wp.workers[id] != nil {
+ wp.workers[id].instance = inst
+ wp.workers[id].updated = time.Now()
+ return
+ }
+ wp.Logger.Debugf("worker.Pool: instance %q appeared with InstanceType %q -- adding with state %q", inst, it.Name, state)
+ wp.workers[id] = &worker{
+ logger: wp.Logger,
+ state: state,
+ instance: inst,
+ instType: it,
+ busy: time.Now(),
+ updated: time.Now(),
+ }
+ go wp.notify()
+}
+
+// Shutdown shuts down a worker with the given type, or returns false
+// if all workers with the given type are busy.
+func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
+ wp.setupOnce.Do(wp.setup)
+ wp.mtx.Lock()
+ defer wp.mtx.Unlock()
+ wp.Logger.Debugf("worker.Pool: Shutdown(%s)", it.Name)
+ for _, tryState := range []State{StateBooting, StateRunning} {
+ for _, wkr := range wp.workers {
+ if wkr.state != tryState || len(wkr.running) > 0 {
+ continue
+ }
+ if wkr.instType != it {
+ continue
+ }
+ go wkr.instance.Destroy()
+ go wp.notify()
+ wkr.state = StateShutdown
+ wkr.updated = time.Now()
+ return true
+ }
+ }
+ return false
+}
+
+// StartContainer starts a container on an idle worker immediately if
+// possible, otherwise returns false.
+func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
+ wp.setupOnce.Do(wp.setup)
+ wp.mtx.Lock()
+ defer wp.mtx.Unlock()
+ var wkr *worker
+ for _, w := range wp.workers {
+ if w.instType == it && w.state == StateRunning && len(w.running) == 0 && !w.starting {
+ if wkr == nil || w.busy.After(wkr.busy) {
+ wkr = w
+ }
+ }
+ }
+ if wkr == nil {
+ return false
+ }
+ wp.Logger.Debugf("worker.Pool: start container %s on instance %s", ctr.UUID, wkr.instance)
+ wkr.starting = true
+ wkr.updated = time.Now()
+ go func() {
+ _, stderr, err := wkr.execute("crunch-run '"+ctr.UUID+"' &", nil)
+ if err != nil {
+ wp.Logger.Errorf("worker.Pool: error starting container %s on instance %s: %s (stderr %q)", ctr.UUID, wkr.instance, err, stderr)
+ }
+ wp.mtx.Lock()
+ if err == nil {
+ wkr.running = append(wkr.running, ctr.UUID)
+ wkr.updated = time.Now()
+ }
+ wkr.starting = false
+ wp.mtx.Unlock()
+ }()
+ return true
+}
+
+// Stop synchronizing with the InstanceSet.
+func (wp *Pool) Stop() {
+ wp.setupOnce.Do(wp.setup)
+ close(wp.stop)
+}
+
+func (wp *Pool) setup() {
+ wp.creating = map[arvados.InstanceType]int{}
+ wp.workers = map[cloud.InstanceID]*worker{}
+ wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
+}
+
+func (wp *Pool) notify() {
+ wp.mtx.RLock()
+ defer wp.mtx.RUnlock()
+ for _, send := range wp.subscribers {
+ select {
+ case send <- struct{}{}:
+ default:
+ }
+ }
+}
+
+func (wp *Pool) SyncLoop() {
+ wp.setupOnce.Do(wp.setup)
+ wait := time.Nanosecond
+ timer := time.NewTimer(time.Second)
+ for {
+ if !timer.Stop() {
+ <-timer.C
+ }
+ timer.Reset(wait)
+ wp.Logger.Debugf("worker.Pool: wait %s", wait)
+ select {
+ case <-timer.C:
+ case <-wp.stop:
+ return
+ }
+ err := wp.Sync()
+ if err != nil {
+ wp.Logger.Warnf("worker.Pool: error getting instance list: %s", err)
+ wait = 15 * time.Second
+ } else {
+ wait = time.Minute
+ }
+ }
+}
+
+func (wp *Pool) Sync() error {
+ wp.setupOnce.Do(wp.setup)
+ wp.Logger.Debugf("worker.Pool: getting instance list")
+ threshold := time.Now()
+ instances, err := wp.InstanceSet.Instances(cloud.InstanceTags{})
+ if err != nil {
+ return err
+ }
+ wp.sync(threshold, instances)
+ return nil
+}
+
+func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
+ wp.mtx.Lock()
+ defer wp.mtx.Unlock()
+
+ for _, inst := range instances {
+ itTag := inst.Tags()["InstanceType"]
+ it, ok := wp.Cluster.InstanceTypes[itTag]
+ if !ok {
+ log := wp.Logger.Debugf
+ if wp.workers[inst.ID()] != nil {
+ log = wp.Logger.Errorf
+ }
+ log("worker.Pool: instance %q has unknown InstanceType tag %q --- ignoring", inst, itTag)
+ continue
+ }
+ wp.updateWorker(inst, it, StateUnknown)
+ }
+
+ for id, wkr := range wp.workers {
+ if wkr.updated.After(threshold) {
+ continue
+ }
+ wp.Logger.Infof("worker.Pool: instance %q disappeared, shutting down worker with state %q", wkr.instance, wkr.state)
+ wkr.state = StateShutdown
+ delete(wp.workers, id)
+ go wkr.instance.Destroy()
+ go wp.notify()
+ }
+
+ if !wp.loaded {
+ wp.loaded = true
+ wp.Logger.Infof("worker.Pool: loaded initial set of instances (%d) from InstanceSet", len(wp.workers))
+ }
+}
+
+// should be called in a new goroutine
+func (wp *Pool) probeAndUpdate(wkr *worker) {
+ wp.mtx.Lock()
+ updated := wkr.updated
+ booted := wkr.booted
+ wp.mtx.Unlock()
+
+ var err error
+ if !booted {
+ err = wkr.probeBooted(wp.Cluster.CloudVMs.BootProbeCommand)
+ if err == nil {
+ booted = true
+ wp.Logger.Infof("worker.Pool: instance %q booted", wkr.instance)
+ }
+ }
+ var running []string
+ if err == nil && booted {
+ running, err = wkr.probeRunning()
+ }
+ wp.mtx.Lock()
+ defer wp.mtx.Unlock()
+ defer func() { wkr.updated = time.Now() }()
+ wkr.booted = booted
+ if err != nil {
+ if wkr.state != StateShutdown {
+ elapsed := time.Since(wkr.probed)
+ wp.Logger.Infof("worker.Pool: instance %q not responding for %s: %s", wkr.instance, elapsed, err)
+
+ label, threshold := "", maxPingFailTime
+ if wkr.state == StateBooting {
+ label, threshold = "new ", maxBootTime
+ }
+ if elapsed > threshold {
+ wp.Logger.Warnf("worker.Pool: %sinstance %q unresponsive since %s; shutting down", label, wkr.instance, wkr.probed)
+ wkr.state = StateShutdown
+ go wkr.instance.Destroy()
+ go wp.notify()
+ }
+ }
+ return
+ }
+ wkr.probed = time.Now()
+ if len(running) > 0 {
+ wkr.busy = time.Now()
+ }
+ if wkr.state == StateShutdown {
+ } else if booted {
+ wkr.state = StateRunning
+ } else {
+ wkr.state = StateBooting
+ }
+ if updated == wkr.updated {
+ // We haven't started any new work since starting the
+ // probe, so this is the latest available information.
+ wkr.running = running
+ }
+ go wp.notify()
+}
diff --git a/lib/dispatchcloud/worker/pool_test.go b/lib/dispatchcloud/worker/pool_test.go
new file mode 100644
index 000000000..4d552aa66
--- /dev/null
+++ b/lib/dispatchcloud/worker/pool_test.go
@@ -0,0 +1,108 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package worker
+
+import (
+ "time"
+
+ "git.curoverse.com/arvados.git/lib/dispatchcloud/test"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "github.com/Sirupsen/logrus"
+ check "gopkg.in/check.v1"
+)
+
+const GiB arvados.ByteSize = 1 << 30
+
+var _ = check.Suite(&PoolSuite{})
+
+type PoolSuite struct{}
+
+func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
+ logrus.StandardLogger().SetLevel(logrus.DebugLevel)
+ provider := &test.LameInstanceSet{Hold: make(chan bool)}
+ type1 := arvados.InstanceType{Name: "a1s", ProviderType: "a1.small", VCPUs: 1, RAM: 1 * GiB, Price: .01}
+ type2 := arvados.InstanceType{Name: "a2m", ProviderType: "a2.medium", VCPUs: 2, RAM: 2 * GiB, Price: .01}
+ pool := &Pool{
+ Cluster: &arvados.Cluster{
+ InstanceTypes: arvados.InstanceTypeMap{
+ type1.Name: type1,
+ type2.Name: type2,
+ },
+ CloudVMs: arvados.CloudVMs{
+ BootProbeCommand: "true",
+ },
+ },
+ Logger: logrus.StandardLogger(),
+ InstanceSet: provider,
+ }
+ notify := pool.Subscribe()
+ defer pool.Unsubscribe(notify)
+ notify2 := pool.Subscribe()
+ defer pool.Unsubscribe(notify2)
+
+ c.Check(pool.Unallocated(type1), check.Equals, 0)
+ c.Check(pool.Unallocated(type2), check.Equals, 0)
+ pool.Create(type2)
+ pool.Create(type1)
+ pool.Create(type2)
+ c.Check(pool.Unallocated(type1), check.Equals, 1)
+ c.Check(pool.Unallocated(type2), check.Equals, 2)
+ // Unblock the pending Create calls and (before calling Sync!)
+ // wait for the pool to process the returned instances.
+ go provider.Release(3)
+ suite.wait(c, pool, notify, func() bool {
+ list, err := provider.Instances(nil)
+ return err == nil && len(list) == 3
+ })
+
+ c.Check(pool.Unallocated(type1), check.Equals, 1)
+ c.Check(pool.Unallocated(type2), check.Equals, 2)
+ pool.Sync()
+ c.Check(pool.Unallocated(type1), check.Equals, 1)
+ c.Check(pool.Unallocated(type2), check.Equals, 2)
+
+ c.Check(pool.Shutdown(type2), check.Equals, true)
+ suite.wait(c, pool, notify, func() bool {
+ return pool.Unallocated(type1) == 1 && pool.Unallocated(type2) == 1
+ })
+ c.Check(pool.Shutdown(type2), check.Equals, true)
+ suite.wait(c, pool, notify, func() bool {
+ return pool.Unallocated(type1) == 1 && pool.Unallocated(type2) == 0
+ })
+ c.Check(pool.Shutdown(type2), check.Equals, false)
+ for {
+ // Consume any waiting notifications to ensure the
+ // next one we get is from Shutdown.
+ select {
+ case <-notify:
+ continue
+ default:
+ }
+ break
+ }
+ c.Check(pool.Shutdown(type1), check.Equals, true)
+ suite.wait(c, pool, notify, func() bool {
+ return pool.Unallocated(type1) == 0 && pool.Unallocated(type2) == 0
+ })
+ select {
+ case <-notify2:
+ case <-time.After(time.Second):
+ c.Error("notify did not receive")
+ }
+ go provider.Release(3) // unblock Destroy calls
+}
+
+func (suite *PoolSuite) wait(c *check.C, pool *Pool, notify <-chan struct{}, ready func() bool) {
+ timeout := time.NewTimer(time.Second).C
+ for !ready() {
+ select {
+ case <-notify:
+ continue
+ case <-timeout:
+ }
+ break
+ }
+ c.Check(ready(), check.Equals, true)
+}
diff --git a/lib/dispatchcloud/worker/worker.go b/lib/dispatchcloud/worker/worker.go
new file mode 100644
index 000000000..7f3e20e2b
--- /dev/null
+++ b/lib/dispatchcloud/worker/worker.go
@@ -0,0 +1,178 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package worker
+
+import (
+ "bytes"
+ "errors"
+ "io"
+ "net"
+ "strings"
+ "sync"
+ "time"
+
+ "git.curoverse.com/arvados.git/lib/cloud"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "github.com/Sirupsen/logrus"
+ "golang.org/x/crypto/ssh"
+)
+
+type State int
+
+const (
+ StateUnknown State = iota // might be running a container already
+ StateBooting // instance is booting
+ StateRunning // instance is running
+ StateShutdown // worker has stopped monitoring the instance
+
+ // TODO: configurable
+ maxPingFailTime = 10 * time.Minute
+ maxBootTime = 20 * time.Minute
+)
+
+var stateString = map[State]string{
+ StateUnknown: "unknown",
+ StateBooting: "booting",
+ StateRunning: "running",
+ StateShutdown: "shutdown",
+}
+
+// String implements fmt.Stringer.
+func (s State) String() string {
+ return stateString[s]
+}
+
+type worker struct {
+ logger logrus.FieldLogger
+ state State
+ instance cloud.Instance
+ instType arvados.InstanceType
+ booted bool
+ probed time.Time
+ updated time.Time
+ busy time.Time
+ running []string
+ starting bool
+
+ client *ssh.Client
+ clientErr error
+ clientOnce sync.Once
+ clientSetup chan bool
+ publicKey ssh.PublicKey
+}
+
+// Create a new SSH session. If session setup fails or the SSH client
+// hasn't been setup yet, setup a new SSH client and try again.
+func (wkr *worker) newSession() (*ssh.Session, error) {
+ try := func(create bool) (*ssh.Session, error) {
+ client, err := wkr.sshClient(create)
+ if err != nil {
+ return nil, err
+ }
+ return client.NewSession()
+ }
+ session, err := try(false)
+ if err != nil {
+ session, err = try(true)
+ }
+ return session, err
+}
+
+// Get the latest SSH client. If another goroutine is in the process
+// of setting one up, wait for it to finish and return its result (or
+// the last successfully setup client, if it fails).
+func (wkr *worker) sshClient(create bool) (*ssh.Client, error) {
+ wkr.clientOnce.Do(func() {
+ wkr.clientSetup = make(chan bool, 1)
+ wkr.clientErr = errors.New("client not yet created")
+ })
+ defer func() { <-wkr.clientSetup }()
+ select {
+ case wkr.clientSetup <- true:
+ if create {
+ client, err := wkr.setupSSHClient()
+ if err == nil || wkr.client == nil {
+ wkr.client, wkr.clientErr = client, err
+ }
+ if err != nil {
+ return nil, err
+ }
+ }
+ default:
+ // Another goroutine is doing the above case. Wait
+ // for it to finish and return whatever it leaves in
+ // wkr.client.
+ wkr.clientSetup <- true
+ }
+ return wkr.client, wkr.clientErr
+}
+
+// Create a new SSH client.
+func (wkr *worker) setupSSHClient() (*ssh.Client, error) {
+ addr := wkr.instance.Address()
+ if addr == "" {
+ return nil, errors.New("instance has no address")
+ }
+ var receivedKey ssh.PublicKey
+ client, err := ssh.Dial("tcp", addr, &ssh.ClientConfig{
+ User: "root",
+ Auth: []ssh.AuthMethod{
+ ssh.Password("1234"),
+ },
+ HostKeyCallback: func(hostname string, remote net.Addr, key ssh.PublicKey) error {
+ receivedKey = key
+ return nil
+ },
+ Timeout: time.Minute,
+ })
+ if err != nil {
+ return nil, err
+ } else if receivedKey == nil {
+ return nil, errors.New("BUG: key was never provided to HostKeyCallback")
+ }
+
+ if wkr.publicKey == nil || !bytes.Equal(wkr.publicKey.Marshal(), receivedKey.Marshal()) {
+ err = wkr.instance.VerifyPublicKey(receivedKey, client)
+ if err != nil {
+ return nil, err
+ }
+ wkr.publicKey = receivedKey
+ }
+ return client, nil
+}
+
+func (wkr *worker) execute(cmd string, stdin io.Reader) ([]byte, []byte, error) {
+ session, err := wkr.newSession()
+ if err != nil {
+ return nil, nil, err
+ }
+ defer session.Close()
+ var stdout, stderr bytes.Buffer
+ session.Stdout = &stdout
+ session.Stderr = &stderr
+ if stdin != nil {
+ session.Stdin = stdin
+ }
+ wkr.logger.Debugf("worker: instance %s execute %q", wkr.instance, cmd)
+ err = session.Run(cmd)
+ return stdout.Bytes(), stderr.Bytes(), err
+}
+
+func (wkr *worker) probeRunning() (running []string, err error) {
+ stdout, _, err := wkr.execute("crunch-run --probe", nil)
+ if err != nil {
+ return
+ }
+ running = strings.Split(string(bytes.TrimRight(stdout, "\n")), "\n")
+ return
+}
+
+func (wkr *worker) probeBooted(cmd string) error {
+ if cmd == "" {
+ cmd = "true"
+ }
+ _, _, err := wkr.execute(cmd, nil)
+ return err
+}
diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go
index 6edd18418..e5b4dead1 100644
--- a/sdk/go/arvados/config.go
+++ b/sdk/go/arvados/config.go
@@ -55,6 +55,7 @@ type Cluster struct {
ManagementToken string
NodeProfiles map[string]NodeProfile
InstanceTypes InstanceTypeMap
+ CloudVMs CloudVMs
HTTPRequestTimeout Duration
RemoteClusters map[string]RemoteCluster
PostgreSQL PostgreSQL
@@ -89,6 +90,16 @@ type InstanceType struct {
Preemptible bool
}
+type CloudVMs struct {
+ // Shell command that exits zero IFF the VM is fully booted
+ // and ready to run containers, e.g., "mount | grep
+ // /encrypted-tmp"
+ BootProbeCommand string
+
+ Driver string
+ DriverParameters map[string]interface{}
+}
+
type InstanceTypeMap map[string]InstanceType
var errDuplicateInstanceTypeName = errors.New("duplicate instance type name")
diff --git a/sdk/go/arvados/container.go b/sdk/go/arvados/container.go
index 210ed9981..ef3547048 100644
--- a/sdk/go/arvados/container.go
+++ b/sdk/go/arvados/container.go
@@ -18,7 +18,7 @@ type Container struct {
Mounts map[string]Mount `json:"mounts"`
Output string `json:"output"`
OutputPath string `json:"output_path"`
- Priority int `json:"priority"`
+ Priority int64 `json:"priority"`
RuntimeConstraints RuntimeConstraints `json:"runtime_constraints"`
State ContainerState `json:"state"`
SchedulingParameters SchedulingParameters `json:"scheduling_parameters"`
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list