[ARVADOS] updated: 1.2.0-9-g7fef859a2
Git user
git at public.curoverse.com
Thu Aug 23 16:47:53 EDT 2018
Summary of changes:
build/run-tests.sh | 1 +
lib/cloud/interfaces.go | 7 +-
.../{worker_pool.go => worker/pool.go} | 180 ++++++++++++++-------
lib/dispatchcloud/worker/pool_test.go | 49 ++++++
4 files changed, 175 insertions(+), 62 deletions(-)
rename lib/dispatchcloud/{worker_pool.go => worker/pool.go} (65%)
create mode 100644 lib/dispatchcloud/worker/pool_test.go
discards 72c5f4651e75f0833f133550245f68405a7ad161 (commit)
via 7fef859a22f5c27a4e5e08a9f7c9559dc5daca3d (commit)
This update added new revisions after undoing existing revisions. That is
to say, the old revision is not a strict subset of the new revision. This
situation occurs when you --force push a change and generate a repository
containing something like this:
* -- * -- B -- O -- O -- O (72c5f4651e75f0833f133550245f68405a7ad161)
\
N -- N -- N (7fef859a22f5c27a4e5e08a9f7c9559dc5daca3d)
When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 7fef859a22f5c27a4e5e08a9f7c9559dc5daca3d
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Thu Aug 23 16:47:45 2018 -0400
13964: sketch
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/build/run-build-packages.sh b/build/run-build-packages.sh
index caebac013..11588913e 100755
--- a/build/run-build-packages.sh
+++ b/build/run-build-packages.sh
@@ -295,6 +295,8 @@ package_go_binary cmd/arvados-server arvados-server \
"Arvados server daemons"
package_go_binary cmd/arvados-server arvados-controller \
"Arvados cluster controller daemon"
+package_go_binary cmd/arvados-server arvados-dispatch-cloud \
+ "Arvados cluster cloud dispatch"
package_go_binary sdk/go/crunchrunner crunchrunner \
"Crunchrunner executes a command inside a container and uploads the output"
package_go_binary services/arv-git-httpd arvados-git-httpd \
diff --git a/build/run-tests.sh b/build/run-tests.sh
index e669e326c..93d7834a6 100755
--- a/build/run-tests.sh
+++ b/build/run-tests.sh
@@ -77,6 +77,7 @@ lib/cmd
lib/controller
lib/crunchstat
lib/dispatchcloud
+lib/dispatchcloud/worker
services/api
services/arv-git-httpd
services/crunchstat
diff --git a/cmd/arvados-server/arvados-dispatch-cloud.service b/cmd/arvados-server/arvados-dispatch-cloud.service
new file mode 100644
index 000000000..5ea5d45e7
--- /dev/null
+++ b/cmd/arvados-server/arvados-dispatch-cloud.service
@@ -0,0 +1,28 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+[Unit]
+Description=Arvados cloud dispatch
+Documentation=https://doc.arvados.org/
+After=network.target
+AssertPathExists=/etc/arvados/config.yml
+
+# systemd==229 (ubuntu:xenial) obeys StartLimitInterval in the [Unit] section
+StartLimitInterval=0
+
+# systemd>=230 (debian:9) obeys StartLimitIntervalSec in the [Unit] section
+StartLimitIntervalSec=0
+
+[Service]
+Type=notify
+EnvironmentFile=-/etc/arvados/environment
+ExecStart=/usr/bin/arvados-dispatch-cloud
+Restart=always
+RestartSec=1
+
+# systemd<=219 (centos:7, debian:8, ubuntu:trusty) obeys StartLimitInterval in the [Service] section
+StartLimitInterval=0
+
+[Install]
+WantedBy=multi-user.target
diff --git a/cmd/arvados-server/cmd.go b/cmd/arvados-server/cmd.go
index 1af3745df..cd15d25dd 100644
--- a/cmd/arvados-server/cmd.go
+++ b/cmd/arvados-server/cmd.go
@@ -9,6 +9,7 @@ import (
"git.curoverse.com/arvados.git/lib/cmd"
"git.curoverse.com/arvados.git/lib/controller"
+ "git.curoverse.com/arvados.git/lib/dispatchcloud"
)
var (
@@ -18,7 +19,8 @@ var (
"-version": cmd.Version(version),
"--version": cmd.Version(version),
- "controller": controller.Command,
+ "controller": controller.Command,
+ "dispatch-cloud": dispatchcloud.Command,
})
)
diff --git a/lib/cloud/interfaces.go b/lib/cloud/interfaces.go
new file mode 100644
index 000000000..33b9e7fa9
--- /dev/null
+++ b/lib/cloud/interfaces.go
@@ -0,0 +1,90 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package cloud
+
+import (
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "golang.org/x/crypto/ssh"
+)
+
+// A RateLimitError should be returned by a Provider when the cloud
+// service indicates it is rejecting all API calls for some time
+// interval.
+type RateLimitError interface {
+ // Time before which the caller should expect requests to
+ // fail.
+ EarliestRetry() time.Time
+ error
+}
+
+// A QuotaError should be returned by a Provider when the cloud
+// service indicates the account cannot create more VMs than already
+// exist.
+type QuotaError interface {
+ // If true, don't create more instances until some existing
+ // instances are destroyed. If false, don't handle the error
+ // as a quota error.
+ IsQuotaError() bool
+ error
+}
+
+type InstanceTags map[string]string
+type InstanceID string
+type ImageID string
+
+// Instance is implemented by the provider-specific instance types.
+type Instance interface {
+ // ID returns the provider's instance ID. It must be stable
+ // for the life of the instance.
+ ID() InstanceID
+
+ // String typically returns the cloud-provided instance ID.
+ String() string
+
+ // Cloud provider's "instance type" ID. Matches a ProviderType
+ // in the cluster's InstanceTypes configuration.
+ ProviderType() string
+
+ // Get current tags
+ Tags() InstanceTags
+
+ // Replace tags with the given tags
+ SetTags(InstanceTags) error
+
+ // Shut down the node
+ Destroy() error
+
+ // SSH server hostname or IP address, or empty string if
+ // unknown while instance is booting.
+ Address() string
+
+ // Return nil if the given public key matches the instance's
+ // SSH server key. If the provided Dialer is not nil,
+ // VerifyPublicKey can use it to make outgoing network
+ // connections from the instance -- e.g., to use the cloud's
+ // "this instance's metadata" API.
+ VerifyPublicKey(ssh.PublicKey, *ssh.Client) error
+}
+
+type Provider interface {
+ // Create a new instance. If supported by the driver, add the
+ // provided public key to /root/.ssh/authorized_keys.
+ //
+ // The returned error should implement RateLimitError and
+ // QuotaError where applicable.
+ Create(arvados.InstanceType, ImageID, InstanceTags, ssh.PublicKey) (Instance, error)
+
+ // Return all instances, including ones that are booting or
+ // shutting down. Optionally, filter out nodes that don't have
+ // all of the given InstanceTags (the caller will ignore these
+ // anyway).
+ Instances(InstanceTags) ([]Instance, error)
+
+ Stop()
+}
+
+type ProviderFactory func(instancePoolID string) Provider
diff --git a/lib/dispatchcloud/cmd.go b/lib/dispatchcloud/cmd.go
new file mode 100644
index 000000000..b2bc91300
--- /dev/null
+++ b/lib/dispatchcloud/cmd.go
@@ -0,0 +1,17 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+ "git.curoverse.com/arvados.git/lib/cmd"
+ "git.curoverse.com/arvados.git/lib/service"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+var Command cmd.Handler = service.Command(arvados.ServiceNameDispatchCloud, newHandler)
+
+func newHandler(cluster *arvados.Cluster, np *arvados.NodeProfile) service.Handler {
+ return &dispatcher{Cluster: cluster, NodeProfile: np}
+}
diff --git a/lib/dispatchcloud/container_queue.go b/lib/dispatchcloud/container_queue.go
new file mode 100644
index 000000000..f8c0512b6
--- /dev/null
+++ b/lib/dispatchcloud/container_queue.go
@@ -0,0 +1,201 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+ "sync"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+type containerQueue struct {
+ Logger logger
+ Cluster *arvados.Cluster
+ Client *arvados.Client
+
+ current map[string]*arvados.Container
+ mtx sync.Mutex
+ keeplocal map[string]struct{}
+}
+
+func (cq *containerQueue) Forget(uuid string) {
+ cq.mtx.Lock()
+ defer cq.mtx.Unlock()
+ ctr := cq.current[uuid]
+ if ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled {
+ delete(cq.current, uuid)
+ }
+}
+
+func (cq *containerQueue) Get(uuid) (arvados.Container, bool) {
+ cq.mtx.Lock()
+ defer cq.mtx.Unlock()
+ if ctr, ok := cq.current[uuid]; !ok {
+ return arvados.Container{}, false
+ } else {
+ return *ctr
+ }
+}
+
+func (cq *containerQueue) All() map[string]arvados.Container {
+ cq.mtx.Lock()
+ defer cq.mtx.Unlock()
+ ret := make(map[string]*arvados.Container, len(cq.current))
+ for uuid, ctr := range cq.current {
+ ret[uuid] = *ctr
+ }
+ return ret
+}
+
+func (cq *containerQueue) setup() {
+ cq.current.Store(map[string]*arvados.Container{})
+}
+
+func (cq *containerQueue) update() error {
+ cq.mtx.Lock()
+ cq.keeplocal = map[string]struct{}{}
+ cq.mtx.Unlock()
+
+ defer func() { cq.mtx.Lock(); cq.keeplocal = nil; cq.mtx.Unlock() }()
+
+ next, err := cq.poll()
+ if err != nil {
+ return err
+ }
+ cq.mtx.Lock()
+ defer cq.mtx.Unlock()
+ for uuid := range cq.keeplocal {
+ next[uuid] = cq.current[uuid]
+ }
+ cq.current = next
+}
+
+func (cq *containerQueue) Lock(uuid string) error {
+ return cq.apiUpdate(uuid, "lock")
+}
+
+func (cq *containerQueue) Unlock(uuid string) error {
+ return cq.apiUpdate(uuid, "unlock")
+}
+
+func (cq *containerQueue) apiUpdate(uuid, action string) error {
+ var resp arvados.Container
+ err := cq.Client.RequestAndDecode(&resp, "POST", "arvados/v1/containers/"+uuid+"/"+action, nil, nil)
+ if err != nil {
+ return err
+ }
+
+ cq.mtx.Lock()
+ defer cq.mtx.Unlock()
+ if cq.keeplocal != nil {
+ cq.keeplocal[uuid] = struct{}{}
+ }
+ if ctr, ok := cq.current[uuid]; !ok {
+ cq.current[uuid] = &resp
+ } else {
+ ctr.State, ctr.Priority, ctr.LockedByUUID = resp.State, resp.Priority, resp.LockedByUUID
+ }
+ return nil
+}
+
+func (cq *containerQueue) poll() (map[string]*arvados.Container, error) {
+ cq.mtx.Lock()
+ size := len(cq.current)
+ cq.mtx.Unlock()
+
+ next := make(map[string]*arvados.Container, size)
+ apply := func(updates []arvados.Container) {
+ for _, upd := range updates {
+ if next[ctr.UUID] == nil {
+ next[ctr.UUID] = &arvados.Container{}
+ }
+ *next[ctr.UUID] = upd
+ }
+ }
+ selectParam := []string{"uuid", "state", "priority"}
+ limitParam := 1000
+
+ mine, err := cq.fetchAll(arvados.ResourceListParams{
+ Select: selectParam,
+ Order: []string{"uuid"},
+ Limit: &limitParam,
+ Count: "none",
+ Filters: {{"locked_by_uuid", "=", cq.authUUID}},
+ })
+ if err != nil {
+ return nil, err
+ }
+ apply(mine)
+
+ avail, err := cq.fetchAll(arvados.ResourceListParams{
+ Select: selectParam,
+ Order: []string{"uuid"},
+ Limit: &limitParam,
+ Count: "none",
+ Filters: {{"state", "=", Queued}, {"priority", ">", "0"}},
+ })
+ if err != nil {
+ return err
+ }
+ apply(avail)
+
+ var missing []string
+ cq.mtx.Lock()
+ for uuid, ctr := range cq.current {
+ if next[uuid] == nil &&
+ ctr.State != arvados.ContainerStateCancelled &&
+ ctr.state != arvados.ContainerStateComplete {
+ missing = append(missing, uuid)
+ }
+ }
+ cq.mtx.Unlock()
+
+ for i, page := 0, 20; i < len(missing); i += page {
+ batch := missing[i:]
+ if len(batch) > page {
+ batch = batch[:page]
+ }
+ ended, err := cq.fetchAll(arvados.ResourceListParams{
+ Select: selectParam,
+ Order: []string{"uuid"},
+ Count: "none",
+ Filters: {{"uuid", "in", batch}},
+ })
+ if err != nil {
+ return err
+ }
+ apply(ended)
+ }
+ return next, nil
+}
+
+func (cq *containerQueue) fetchAll(initialParams arvados.ResourceListParams) ([]arvados.Container, error) {
+ var results []arvados.Container
+ params := initialParams
+ params.Offset = 0
+ for {
+ // This list variable must be a new one declared
+ // inside the loop: otherwise, items in the API
+ // response would get deep-merged into the items
+ // loaded in previous iterations.
+ var list arvados.ContainerList
+
+ err := cq.Client.RequestAndDecode(&list, "GET", "arvados/v1/containers", nil, params)
+ if err != nil {
+ return nil, err
+ }
+ if len(list.Items) == 0 {
+ break
+ }
+
+ results = append(results, list.Items)
+ if len(params.Order) == 1 && params.Order[0] == "uuid" {
+ params.Filters = append(initialParams.Filters, []interface{}{"uuid", ">", list.Items[len(list.Items)-1].UUID})
+ } else {
+ params.Offset += len(list.Items)
+ }
+ }
+ return results, nil
+}
diff --git a/lib/dispatchcloud/dispatcher.go b/lib/dispatchcloud/dispatcher.go
new file mode 100644
index 000000000..90e2c799f
--- /dev/null
+++ b/lib/dispatchcloud/dispatcher.go
@@ -0,0 +1,61 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+ "net/http"
+
+ "git.curoverse.com/arvados.git/lib/cloud"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "github.com/Sirupsen/logrus"
+)
+
+type dispatcher struct {
+ Cluster *arvados.Cluster
+ NodeProfile *arvados.NodeProfile
+
+ logger logger
+ provider cloud.Provider
+ workerPool workerPool
+ queue containerQueue
+ scheduler scheduler
+ syncer syncer
+ staleLockFixer staleLockFixer
+ httpHandler http.Handler
+}
+
+// ServeHTTP implements service.Handler.
+func (disp *dispatcher) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ disp.setupOnce.Do(disp.setup)
+ disp.handler.ServeHTTP(w, r)
+}
+
+// CheckHealth implements service.Handler.
+func (disp *dispatcher) CheckHealth() error {
+ disp.setupOnce.Do(disp.setup)
+ return nil
+}
+
+func (disp *dispatcher) setup() {
+ disp.logger = logrus.StandardLogger()
+ disp.provider = &providerProxy{disp.logger, newProvider(disp.Cluster)}
+ disp.workerPool = &workerPool{disp.logger, disp.provider}
+ disp.queue = &containerQueue{disp.logger, disp.Cluster}
+ disp.scheduler = &scheduler{disp.logger, disp.queue, disp.workerPool}
+ disp.syncer = &syncer{disp.logger, disp.queue, disp.workerPool}
+ disp.staleLockFixer = &staleLockFixer{disp.logger, disp.queue, disp.workerPool}
+
+ go func() {
+ disp.workerPool.Start()
+ // staleLockFixer must be ready before scheduler can start
+ disp.staleLockFixer.Wait()
+ go disp.scheduler.Run()
+ go disp.syncer.Run()
+ }()
+
+ mux := http.NewServeMux()
+ mux.Handle("/status.json", disp.serveStatusJSON)
+ disp.httpHandler = mux
+}
diff --git a/lib/dispatchcloud/logger.go b/lib/dispatchcloud/logger.go
new file mode 100644
index 000000000..90bb6ca68
--- /dev/null
+++ b/lib/dispatchcloud/logger.go
@@ -0,0 +1,29 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+ "sync"
+ "time"
+)
+
+type logger interface {
+ Printf(string, ...interface{})
+ Warnf(string, ...interface{})
+ Debugf(string, ...interface{})
+}
+
+var nextSpam = map[string]time.Time{}
+var nextSpamMtx sync.Mutex
+
+func unspam(msg string) bool {
+ nextSpamMtx.Lock()
+ defer nextSpamMtx.Unlock()
+ if nextSpam[msg].Before(time.Now()) {
+ nextSpam[msg] = time.Now().Add(time.Minute)
+ return true
+ }
+ return false
+}
diff --git a/lib/dispatchcloud/readme.go b/lib/dispatchcloud/readme.go
new file mode 100644
index 000000000..a4b005eb8
--- /dev/null
+++ b/lib/dispatchcloud/readme.go
@@ -0,0 +1,79 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+// A dispatcher comprises a container queue, a scheduler, a worker
+// pool, a cloud provider, a stale-lock fixer, and a syncer.
+// 1. Choose a provider.
+// 2. Start a worker pool.
+// 3. Start a container queue.
+// 4. Run a stale-lock fixer.
+// 5. Start a scheduler.
+// 6. Start a syncer.
+//
+//
+// A provider (cloud driver) creates new cloud VM instances and gets
+// the latest list of instances. The returned instances implement
+// proxies to the provider's metadata and control interfaces (get IP
+// address, update tags, shutdown).
+//
+//
+// A workerPool tracks workers' instance types and readiness states
+// (available to do work now, booting, suffering a temporary network
+// outage, shutting down). It loads internal state from the cloud
+// provider's list of instances at startup, and syncs periodically
+// after that.
+//
+//
+// A worker maintains a multiplexed SSH connection to a cloud
+// instance, retrying/reconnecting as needed, so the workerPool can
+// execute commands. It asks the provider's instance to verify its SSH
+// public key once when first connecting, and again later if the key
+// changes.
+//
+//
+// A container queue tracks the known state (according to
+// arvados-controller) of each container of interest -- i.e., queued,
+// or locked/running using our own dispatch token. It also proxies the
+// dispatcher's lock/unlock/cancel requests to the controller. It
+// handles concurrent refresh and update operations without exposing
+// out-of-order updates to its callers. (It drops any new information
+// that might have originated before its own most recent
+// lock/unlock/cancel operation.)
+//
+//
+// A stale-lock fixer waits for any already-locked containers (i.e.,
+// locked by a prior server process) to appear on workers as the
+// worker pool recovers its state. It unlocks/requeues any that still
+// remain when all workers are recovered or shutdown, or its timer
+// expires.
+//
+//
+// A scheduler chooses which containers to assign to which idle
+// workers, and decides what to do when there are not enough idle
+// workers (including shutting down some idle nodes).
+//
+//
+// A syncer updates state to Cancelled when a running container
+// process dies without finalizing its entry in the controller
+// database. It also calls the worker pool to kill containers that
+// have priority=0 while locked or running.
+//
+//
+// A provider proxy wraps a provider with rate-limiting logic. After
+// the wrapped provider receives a cloud.RateLimitError, the proxy
+// starts returning errors to callers immediately without calling
+// through to the wrapped provider.
+//
+//
+// TBD: Bootstrapping script via SSH, too? Future version.
+//
+// TBD: drain instance, keep instance alive
+// TBD: metrics, diagnostics
+// TBD: why dispatch token currently passed to worker?
+//
+// Metrics: queue size, time job has been in queued, #idle/busy/booting nodes
+// Timing in each step, and end-to-end
+// Metrics: boot/idle/alloc time and cost
diff --git a/lib/dispatchcloud/scheduler.go b/lib/dispatchcloud/scheduler.go
new file mode 100644
index 000000000..a7dd3f244
--- /dev/null
+++ b/lib/dispatchcloud/scheduler.go
@@ -0,0 +1,155 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+ "sort"
+ "sync"
+ "time"
+
+ "git.curoverse.com/arvados.git/lib/cloud"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+type container struct {
+ container arvados.Container
+ wantsType arvados.InstanceType
+}
+
+func (c *container) String() string {
+ return c.container.UUID
+}
+
+// A scheduler assigns queued containers to available workers, and
+// creates new workers when there aren't enough available.
+//
+// If it encounters problems creating new workers, the scheduler also
+// shuts down idle workers in case they are consuming quota.
+// Otherwise, workers are responsible for shutting themselves down
+// after the configured idle time threshold.
+type scheduler struct {
+ logger
+ containerQueue
+ workerPool
+
+ queue []*container
+ queueMtx sync.Mutex
+ instances map[cloud.Instance]struct{}
+ enqueueOrUpdate chan container
+ runOnce sync.Once
+}
+
+func (sched *scheduler) try() {
+ ctrs := sched.containerQueue.Current()
+ queue := make([]arvados.Container, 0, len(ctrs))
+ for uuid, ctr := range ctrs {
+ queue = append(queue, ctr)
+ }
+ sort.Slice(queue, func(i, j int) bool {
+ queue[i].Priority > queue[j].Priority
+ })
+ dibs := map[arvados.InstanceType]int{}
+ for _, ctr := range queue {
+ switch {
+ case ctr.State == arvados.ContainerStateQueued && ctr.Priority > 0:
+ it, err := ChooseInstanceType(sched.Cluster, &ctr)
+ if err != nil {
+ sched.logger.Warnf("cannot run %s", &ctr)
+ continue
+ }
+
+ if dibs[it] >= sched.workerPool.Pending(it) {
+ err := sched.workerPool.Create(it)
+ if err != nil {
+ if unspam(err.Error()) {
+ sched.logger.Warnf("scheduler: workerPool.Create: %s", err)
+ }
+ return
+ }
+ }
+ dibs[it]++
+ // ...
+ case ctr.State == arvados.ContainerStateLocked || ctr.State == arvados.ContainerStateRunning:
+ // ...
+ }
+ }
+}
+
+func (sched *scheduler) setup() {
+ sched.enqueueOrUpdate = make(chan container, 1)
+ go sched.run()
+}
+
+func (sched *scheduler) run() {
+ wakeup := make(chan struct{}, 1)
+ workers := map[*worker]*container{}
+ ctrs := map[string]*container{} // key is container UUID
+ timer := time.NewTimer(time.Second)
+ queue := []*container{}
+ for {
+ select {
+ case ctr := <-sched.enqueueOrUpdate:
+ // Get a newly queued container, or update
+ // priority/state.
+ if ctrs[ctr.UUID] == nil {
+ ctrs[ctr.UUID] = &ctr
+ } else {
+ ctrs[ctr.UUID].container = ctr.container
+ continue
+ }
+ case <-timer.C:
+ case <-wakeup:
+ }
+
+ queue = queue[:0]
+ for _, ctr := range ctrs {
+ if ctr.State == arvados.ContainerStateLocked {
+ queue = append(queue, ctr)
+ }
+ }
+ sort.Slice(queue, func(i, j int) bool {
+ if d := queue[i].Priority - queue[j].Priority; d != 0 {
+ return d > 0
+ } else {
+ return queue[i].UUID > queue[j].UUID
+ }
+ })
+
+ // Dispatch highest priority container to the idle
+ // worker with the shortest idle time.
+ for len(queue) > 0 {
+ select {
+ case todo[queue[0].wantsType] <- queue[0]:
+ queue = queue[1:]
+ continue
+ default:
+ }
+ break
+ }
+
+ // Compare queue to booting.
+ }
+}
+
+// DispatchFunc returns a dispatch.DispatchFunc.
+func (sched *scheduler) DispatchFunc(cluster *arvados.Cluster) func(actr arvados.Container, update <-chan arvados.Container) {
+ go sched.runOnce.Do(sched.run)
+ return func(actr arvados.Container, update <-chan arvados.Container) {
+ it, err := ChooseInstanceType(cluster, &actr)
+ if err != nil {
+ return err
+ }
+ sched.enqueueOrUpdate <- container{
+ container: actr,
+ wantsType: it,
+ }
+ for actr := range update {
+ sched.enqueueOrUpdate <- container{
+ container: actr,
+ wantsType: it,
+ }
+ }
+ }
+}
diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go
new file mode 100644
index 000000000..9e124cf52
--- /dev/null
+++ b/lib/dispatchcloud/worker/pool.go
@@ -0,0 +1,479 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package worker
+
+import (
+ "bytes"
+ "errors"
+ "io"
+ "net"
+ "sort"
+ "strings"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "git.curoverse.com/arvados.git/lib/cloud"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "github.com/Sirupsen/logrus"
+ "golang.org/x/crypto/ssh"
+)
+
+type State int
+
+const (
+ StateUnknown State = iota // might be running a container already
+ StateBooting // instance is booting
+ StateRunning // instance is running
+ StateShutdown // worker has stopped monitoring the instance
+
+ // TODO: configurable
+ maxPingFailTime = 10 * time.Minute
+ maxBootTime = 20 * time.Minute
+)
+
+var stateString = map[State]string{
+ StateUnknown: "unknown",
+ StateBooting: "booting",
+ StateRunning: "running",
+ StateShutdown: "shutdown",
+}
+
+// String implements fmt.Stringer.
+func (s State) String() string {
+ return stateString[s]
+}
+
+type Pool struct {
+ Logger logrus.FieldLogger
+ Provider cloud.Provider
+
+ subscribers map[chan<- struct{}]bool
+ creating map[arvados.InstanceType]int // goroutines waiting for provider.Create to return
+ workers map[cloud.InstanceID]worker
+ loaded bool // loaded list of instances from provider at least once
+ stop chan bool
+ mtx sync.RWMutex
+}
+
+func (wp *Pool) Start() {
+ wp.setupOnce.Do(wp.setup)
+}
+
+// Subscribe returns a channel that becomes ready whenever a worker's
+// state changes.
+//
+// Example:
+//
+// ch := wp.Subscribe()
+// for range ch {
+// // some worker has become available; try scheduling some work
+// if wantStop {
+// wp.Unsubscribe(ch)
+// break
+// }
+// }
+func (wp *Pool) Subscribe() <-chan struct{} {
+ wp.Start()
+ wp.mtx.Lock()
+ defer wp.mtx.Unlock()
+ ch := make(chan struct{}, 1)
+ wp.subscribers[ch] = true
+ return ch
+}
+
+// Unsubscribe stops sending updates to the given channel, and closes
+// it.
+func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
+ wp.Start()
+ wp.mtx.Lock()
+ defer wp.mtx.Unlock()
+ if _, ok := wp.subscribers[ch]; ok {
+ delete(wp.subscribers, ch)
+ close(ch)
+ }
+}
+
+// Pending returns the number of unallocated (booting + idle +
+// unknown) instances of the given type.
+func (wp *Pool) Pending(it arvados.InstanceType) int {
+ wp.Start()
+ wp.mtx.RLock()
+ defer wp.mtx.RUnlock()
+ n := 0
+ for _, wkr := range wp.workers {
+ state, running := wkr.State()
+ if wkr.Type() == it && len(running) == 0 && (state == StateRunning || state == StateBooting || state == StateUnknown) {
+ n++
+ }
+ }
+ return n + wp.creating[it]
+}
+
+// Create a new instance with the given type, and add it to the worker
+// pool. The worker is added immediately; instance creation runs in
+// the background.
+func (wp *Pool) Create(it arvados.InstanceType) error {
+ wp.Start()
+ wp.mtx.Lock()
+ defer wp.mtx.Unlock()
+ tags := cloud.InstanceTags{"InstanceType": it.Name}
+ wp.creating[it]++
+ go func() {
+ inst, err := wp.Provider.Create(it, wp.imageID, tags, nil)
+ wp.mtx.Lock()
+ defer wp.mtx.Unlock()
+ wp.creating[it]--
+ if err != nil {
+ wp.Logger.Errorf("worker.Pool: create instance: %s", err)
+ return
+ }
+ if wp.workers[inst.ID()] == nil {
+ wp.workers[inst.ID()] = &worker{wp.Logger, StateBooting, inst, it}
+ }
+ }()
+ return nil
+}
+
+// Shutdown shuts down a worker with the given type, or returns false
+// if all workers with the given type are busy.
+func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
+ wp.Start()
+ wp.mtx.Lock()
+ defer wp.mtx.Unlock()
+ for _, tryState := range []State{StateBooting, StateRunning} {
+ for _, wkr := range wp.workers {
+ if state, running := wkr.State(); state != tryState || len(running) > 0 {
+ continue
+ }
+ if _, it := wkr.Instance(); wt != it {
+ continue
+ }
+ wkr.Shutdown()
+ return true
+ }
+ }
+ return false
+}
+
+// StartContainer starts a container on an idle worker immediately if
+// possible, otherwise returns false.
+func (wp *Pool) StartContainer(ctr *container) bool {
+ wp.Start()
+ wp.mtx.Lock()
+ defer wp.mtx.Unlock()
+ var avail []worker
+ for _, wkr := range wp.workers {
+ if _, it := wkr.Instance(); it == ctr.wantsType && wkr.State() == StateRunning {
+ avail = append(avail, wkr)
+ }
+ }
+ // Prefer workers with shorter idle times
+ sort.Slice(avail, func(i, j int) {
+ return avail[i].busy.After(avail[j].busy)
+ })
+ for i, wkr := range avail {
+ if wkr.StartContainer(ctr) {
+ wp.Logger.Debugf("worker.Pool: worker %s accepted container %s", wkr, ctr)
+ return true
+ }
+ }
+ return false
+}
+
+// Stop synchronizing with the provider.
+func (wp *Pool) Stop() {
+ close(wp.stop)
+}
+
+func (wp *Pool) setup() {
+ wp.notify = make(chan worker, 1)
+ wp.creating = map[cloud.InstanceType]int{}
+ wp.workers = map[cloud.InstanceID]worker{}
+ wp.subscribers = map[chan<- struct{}]bool{}
+
+ go wp.handleWorkerNotify()
+ go wp.syncLoop()
+}
+
+func (wp *Pool) handleWorkerNotify() {
+ for {
+ select {
+ case <-wp.notify:
+ case <-wp.stop:
+ return
+ }
+ wp.mtx.RLock()
+ for ch := range wp.subscribers {
+ select {
+ case ch <- struct{}{}:
+ default:
+ }
+ }
+ wp.mtx.RUnlock()
+ }
+}
+
+func (wp *Pool) syncLoop() {
+ var wait time.Duration
+ timer := time.NewTimer(time.Second)
+ for {
+ if !timer.Stop() {
+ <-timer.C
+ }
+ timer.Reset(wait)
+ wp.Logger.Debugf("worker.Pool: wait %s", wait)
+ select {
+ case <-timer.C:
+ case <-wp.stop:
+ return
+ }
+ err := wp.Sync()
+ if err != nil {
+ wp.Logger.Warnf("worker.Pool: error getting instance list: %s", err)
+ wait = 15 * time.Second
+ } else {
+ wait = time.Minute
+ }
+ }
+}
+
+func (wp *Pool) Sync() error {
+ wp.Logger.Debugf("worker.Pool: getting instance list")
+ threshold := time.Now()
+ instances, err := wp.Provider.Instances()
+ if err != nil {
+ return err
+ }
+ wp.sync(threshold, instances)
+ return nil
+}
+
+func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
+ wp.mtx.Lock()
+ defer wp.mtx.Unlock()
+
+ updated := map[cloud.InstanceID]bool{}
+
+ for _, inst := range instances {
+ itTag := inst.Tags()["InstanceType"]
+ it, ok := wp.Cluster.InstanceTypes[itTag]
+ if !ok {
+ wp.Logger.Debugf("worker.Pool: instance %s has InstanceType tag %q --- ignoring", inst, itTag)
+ continue
+ }
+ id := inst.ID()
+ if wkr := wp.workers[id]; wkr != nil {
+ wkr.instance = inst
+ } else {
+ wp.workers[id] = &worker{wp.Logger, StateUnknown, inst, it}
+ }
+ updated[id] = true
+ }
+
+ for id, wkr := range wp.workers {
+ if updated[id] {
+ continue
+ }
+ inst := wkr.Instance()
+ if inst != nil && wkr.busy.Before(threshold) {
+ state, _ := wkr.State()
+ wp.Logger.Infof("worker.Pool: instance %s disappeared, shutting down worker with state %s", inst, state)
+ wkr.Shutdown()
+ delete(wp.workers, id)
+ }
+ }
+
+ if !wp.loaded {
+ wp.loaded = true
+ wp.Logger.Infof("worker.Pool: loaded initial set of instances (%d) from provider", len(wp.workers))
+ }
+}
+
+// should be called in a new goroutine
+func (wp *Pool) probeAndUpdate(wkr *worker) {
+ wp.mtx.Lock()
+ updated := wkr.updated
+ booted := wkr.booted
+ wp.mtx.Unlock()
+
+ var err error
+ if !booted {
+ err = wkr.probeBooted(wp.BootProbeCommand)
+ if err == nil {
+ booted = true
+ wp.Logger.Infof("worker.Pool: instance %s booted", wkr.instance)
+ }
+ }
+ var running []string
+ if err == nil && booted {
+ running, err = wkr.probeRunning()
+ }
+ wp.mtx.Lock()
+ defer wp.mtx.Unlock()
+ wkr.booted = booted
+ if err != nil {
+ if wkr.state != StateShutdown {
+ elapsed := time.Since(wkr.probed)
+ wp.Logger.Infof("worker.Pool: instance %s not responding for %s: %s", inst, elapsed, err)
+
+ label, threshold := "", maxPingFailTime
+ if wkr.state == StateBooting {
+ label, threshold = "new ", maxBootTime
+ }
+ if elapsed > threshold {
+ wp.Logger.Warnf("worker.Pool: %sinstance %s unresponsive since %s; shutting down", label, inst, wkr.probed)
+ inst.Destroy()
+ }
+ }
+ return
+ }
+ wkr.probed = time.Now()
+ if len(running) > 0 {
+ wkr.busy = time.Now()
+ }
+ if wkr.state == StateShutdown {
+ } else if booted {
+ wkr.state = StateRunning
+ } else {
+ wkr.state = StateBooting
+ }
+ if starts == wkr.starts {
+ // We haven't started any new work since starting the
+ // probe, so this is the latest available information.
+ wkr.running = running
+ }
+ wp.notify <- wkr
+}
+
+type worker struct {
+ logger *logrus.FieldLogger
+ state State
+ instance cloud.Instance
+ instType arvados.InstanceType
+ probed time.Time
+ updated time.Time
+ busy time.Time
+ verifiedKey atomic.Value
+ client *ssh.Client
+ clientErr error
+ clientOnce sync.Once
+ clientSetup chan bool
+}
+
+// Create a new SSH session. If session setup fails or the SSH client
+// hasn't been setup yet, setup a new SSH client and try again.
+func (wkr *worker) newSession() (*ssh.Session, error) {
+ try := func(create bool) (*ssh.Session, error) {
+ client, err := wkr.sshClient(create)
+ if err != nil {
+ return nil, err
+ }
+ return client.NewSession()
+ }
+ session, err := try(false)
+ if err != nil {
+ session, err = try(true)
+ }
+ return session, err
+}
+
+// Get the latest SSH client. If another goroutine is in the process
+// of setting one up, wait for it to finish and return its result (or
+// the last successfully setup client, if it fails).
+func (wkr *worker) sshClient(create bool) (*ssh.Client, error) {
+ wkr.clientOnce.Do(func() {
+ wkr.clientSetup = make(chan bool, 1)
+ wkr.clientErr = errors.New("client not yet created")
+ })
+ defer func() { <-wkr.clientSetup }()
+ select {
+ case wkr.clientSetup <- true:
+ if create {
+ client, err := wkr.setupSSHClient()
+ if err == nil || wkr.client == nil {
+ wkr.client, wkr.clientErr = client, err
+ }
+ if err != nil {
+ return nil, err
+ }
+ }
+ default:
+ // Another goroutine is doing the above case. Wait
+ // for it to finish and return whatever it leaves in
+ // wkr.client.
+ wkr.clientSetup <- true
+ }
+ return wkr.client, wkr.clientErr
+}
+
+// Create a new SSH client.
+func (wkr *worker) setupSSHClient() (*ssh.Client, error) {
+ addr := instance.Address()
+ if addr == "" {
+ return nil, errors.New("instance has no address")
+ }
+ var receivedKey ssh.PublicKey
+ client, err := ssh.Dial("tcp", addr, &ssh.ClientConfig{
+ User: "root",
+ Auth: []ssh.AuthMethod{
+ ssh.Password("1234"),
+ },
+ HostKeyCallback: func(hostname string, remote net.Addr, key ssh.PublicKey) error {
+ receivedKey = key
+ return nil
+ },
+ Timeout: time.Minute,
+ })
+ if err != nil {
+ return nil, err
+ } else if key == nil {
+ return nil, errors.New("BUG: key was never provided to HostKeyCallback")
+ }
+
+ existingKey, _ := wkr.publicKey.Load().(ssh.PublicKey)
+ if existingKey == nil || !bytes.Equal(existingKey.Marshal(), receivedKey.Marshal()) {
+ err = wkr.instance.VerifyPublicKey(receivedKey, client.Dial)
+ if err != nil {
+ return nil, err
+ }
+ wkr.publicKey.Store(receivedKey)
+ }
+ return client, nil
+}
+
+func (wkr *worker) execute(cmd string, stdin io.Reader) ([]byte, []byte, error) {
+ session, err := wkr.newSession()
+ if err != nil {
+ return nil, nil, err
+ }
+ defer session.Close()
+ var stdout, stderr bytes.Buffer
+ session.Stdout = stdout
+ session.Stderr = stderr
+ if stdin != nil {
+ session.Stdin = stdin
+ }
+ wkr.logger.Debugf("worker.execute: %s: running command: %s", sc.host, cmd)
+ err = session.Run(cmd)
+ return stdout.Bytes(), stderr.Bytes(), err
+}
+
+func (wkr *worker) probeRunning() (running []string, err error) {
+ stdout, stderr, err := wkr.execute("crunch-run --probe", nil)
+ if err != nil {
+ return
+ }
+ running = strings.Split(string(bytes.TrimRight("\n")), "\n")
+ return
+}
+
+func (wkr *worker) probeBooted(cmd string) error {
+ if cmd == "" {
+ cmd = "true"
+ }
+ _, _, err := wkr.execute(cmd, nil)
+ return err
+}
diff --git a/lib/dispatchcloud/worker/pool_test.go b/lib/dispatchcloud/worker/pool_test.go
new file mode 100644
index 000000000..30e9db6b5
--- /dev/null
+++ b/lib/dispatchcloud/worker/pool_test.go
@@ -0,0 +1,49 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package worker
+
+import (
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "github.com/Sirupsen/logrus"
+ check "gopkg.in/check.v1"
+)
+
+const GiB arvados.ByteSize = 1 << 30
+
+var _ = check.Suite(&NodeSizeSuite{})
+
+type PoolSuite struct{}
+
+func (*PoolSuite) TestCreateAvail(c *check.C) {
+ createAfter := make(chan struct{})
+ provider := &LameInstanceProvider{
+ CreateAfter: createAfter,
+ }
+ pool := &Pool{
+ Logger: logrus.StandardLogger(),
+ Provider: provider,
+ }
+ notify := pool.Subscribe()
+ defer pool.Unsubscribe(notify)
+ type1 := arvados.InstanceType{Name: "a1s", ProviderType: "a1.small", VCPUs: 1, RAM: 1 * GiB, Price: .01}
+ type2 := arvados.InstanceType{Name: "a2m", ProviderType: "a2.medium", VCPUs: 2, RAM: 2 * GiB, Price: .01}
+ c.Check(pool.Pending(type1), check.Equals, 0)
+ c.Check(pool.Pending(type2), check.Equals, 0)
+ pool.Create(type2)
+ pool.Create(type1)
+ pool.Create(type2)
+ c.Check(pool.Pending(type1), check.Equals, 1)
+ c.Check(pool.Pending(type2), check.Equals, 2)
+ close(createAfter)
+ pool.Sync()
+ c.Check(pool.Pending(type1), check.Equals, 0)
+ c.Check(pool.Pending(type2), check.Equals, 0)
+ select {
+ case _, ok := <-notify:
+ c.Check(ok, check.Equals, true)
+ default:
+ c.Error("nothing received on notify channel")
+ }
+}
diff --git a/sdk/go/arvados/container.go b/sdk/go/arvados/container.go
index 210ed9981..ef3547048 100644
--- a/sdk/go/arvados/container.go
+++ b/sdk/go/arvados/container.go
@@ -18,7 +18,7 @@ type Container struct {
Mounts map[string]Mount `json:"mounts"`
Output string `json:"output"`
OutputPath string `json:"output_path"`
- Priority int `json:"priority"`
+ Priority int64 `json:"priority"`
RuntimeConstraints RuntimeConstraints `json:"runtime_constraints"`
State ContainerState `json:"state"`
SchedulingParameters SchedulingParameters `json:"scheduling_parameters"`
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list