[ARVADOS] updated: 1.2.0-9-g72c5f4651

Git user git at public.curoverse.com
Thu Aug 23 11:49:33 EDT 2018


Summary of changes:
 lib/cloud/interfaces.go              |  9 ++---
 lib/dispatchcloud/container_queue.go | 70 ++++++++++++++++++++++++++++--------
 lib/dispatchcloud/logger.go          | 18 ++++++++++
 lib/dispatchcloud/scheduler.go       | 36 +++++++++++++++++++
 lib/dispatchcloud/worker_pool.go     |  1 -
 5 files changed, 114 insertions(+), 20 deletions(-)

  discards  1411934711aa775edfe2af74bdce3e2b076a3dfa (commit)
       via  72c5f4651e75f0833f133550245f68405a7ad161 (commit)

This update added new revisions after undoing existing revisions.  That is
to say, the old revision is not a strict subset of the new revision.  This
situation occurs when you --force push a change and generate a repository
containing something like this:

 * -- * -- B -- O -- O -- O (1411934711aa775edfe2af74bdce3e2b076a3dfa)
            \
             N -- N -- N (72c5f4651e75f0833f133550245f68405a7ad161)

When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 72c5f4651e75f0833f133550245f68405a7ad161
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Thu Aug 23 09:29:26 2018 -0400

    13964: sketch
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/build/run-build-packages.sh b/build/run-build-packages.sh
index caebac013..11588913e 100755
--- a/build/run-build-packages.sh
+++ b/build/run-build-packages.sh
@@ -295,6 +295,8 @@ package_go_binary cmd/arvados-server arvados-server \
     "Arvados server daemons"
 package_go_binary cmd/arvados-server arvados-controller \
     "Arvados cluster controller daemon"
+package_go_binary cmd/arvados-server arvados-dispatch-cloud \
+    "Arvados cluster cloud dispatch"
 package_go_binary sdk/go/crunchrunner crunchrunner \
     "Crunchrunner executes a command inside a container and uploads the output"
 package_go_binary services/arv-git-httpd arvados-git-httpd \
diff --git a/cmd/arvados-server/arvados-dispatch-cloud.service b/cmd/arvados-server/arvados-dispatch-cloud.service
new file mode 100644
index 000000000..5ea5d45e7
--- /dev/null
+++ b/cmd/arvados-server/arvados-dispatch-cloud.service
@@ -0,0 +1,28 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+[Unit]
+Description=Arvados cloud dispatch
+Documentation=https://doc.arvados.org/
+After=network.target
+AssertPathExists=/etc/arvados/config.yml
+
+# systemd==229 (ubuntu:xenial) obeys StartLimitInterval in the [Unit] section
+StartLimitInterval=0
+
+# systemd>=230 (debian:9) obeys StartLimitIntervalSec in the [Unit] section
+StartLimitIntervalSec=0
+
+[Service]
+Type=notify
+EnvironmentFile=-/etc/arvados/environment
+ExecStart=/usr/bin/arvados-dispatch-cloud
+Restart=always
+RestartSec=1
+
+# systemd<=219 (centos:7, debian:8, ubuntu:trusty) obeys StartLimitInterval in the [Service] section
+StartLimitInterval=0
+
+[Install]
+WantedBy=multi-user.target
diff --git a/cmd/arvados-server/cmd.go b/cmd/arvados-server/cmd.go
index 1af3745df..cd15d25dd 100644
--- a/cmd/arvados-server/cmd.go
+++ b/cmd/arvados-server/cmd.go
@@ -9,6 +9,7 @@ import (
 
 	"git.curoverse.com/arvados.git/lib/cmd"
 	"git.curoverse.com/arvados.git/lib/controller"
+	"git.curoverse.com/arvados.git/lib/dispatchcloud"
 )
 
 var (
@@ -18,7 +19,8 @@ var (
 		"-version":  cmd.Version(version),
 		"--version": cmd.Version(version),
 
-		"controller": controller.Command,
+		"controller":     controller.Command,
+		"dispatch-cloud": dispatchcloud.Command,
 	})
 )
 
diff --git a/lib/cloud/interfaces.go b/lib/cloud/interfaces.go
new file mode 100644
index 000000000..071339573
--- /dev/null
+++ b/lib/cloud/interfaces.go
@@ -0,0 +1,87 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package cloud
+
+import (
+	"net"
+	"time"
+
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+	"golang.org/x/crypto/ssh"
+)
+
+// A RateLimitError should be returned by a Provider when the cloud
+// service indicates it is rejecting all API calls for some time
+// interval.
+type RateLimitError interface {
+	// Time before which the caller should expect requests to
+	// fail.
+	EarliestRetry() time.Time
+	error
+}
+
+// A QuotaError should be returned by a Provider when the cloud
+// service indicates the account cannot create more VMs than already
+// exist.
+type QuotaError interface {
+	// If true, don't create more instances until some existing
+	// instances are destroyed. If false, don't handle the error
+	// as a quota error.
+	IsQuotaError() bool
+	error
+}
+
+type InstanceTags map[string]string
+type InstanceID string
+type ImageID string
+
+// Instance is implemented by the provider-specific instance types.
+type Instance interface {
+	// ID returns the provider's instance ID. It must be stable
+	// for the life of the instance.
+	ID() InstanceID
+
+	// String typically returns the cloud-provided instance ID.
+	String() string
+
+	// Cloud provider's "instance type" ID. Matches a ProviderType
+	// in the cluster's InstanceTypes configuration.
+	ProviderType() string
+
+	// Get current tags
+	Tags() InstanceTags
+
+	// Replace tags with the given tags
+	SetTags(InstanceTags) error
+
+	// Shut down the node
+	Destroy() error
+
+	// SSH server hostname or IP address, or empty string if
+	// unknown while instance is booting.
+	Address() string
+
+	// Return nil if the given public key matches the instance's
+	// SSH server key. If the provided Dialer is not nil,
+	// VerifyPublicKey can use it to make outgoing network
+	// connections from the instance -- e.g., to use the cloud's
+	// "this instance's metadata" API.
+	VerifyPublicKey(ssh.PublicKey, net.Dialer) error
+}
+
+type Provider interface {
+	// Create a new instance. If supported by the driver, add the
+	// provided public key to /root/.ssh/authorized_keys.
+	//
+	// The returned error should implement RateLimitError and
+	// QuotaError where applicable.
+	Create(arvados.InstanceType, ImageID, InstanceTags, ssh.PublicKey) (Instance, error)
+
+	// Return all instances, including ones that are booting or
+	// shutting down. Optionally, filter out nodes that don't have
+	// all of the given InstanceTags (the caller will ignore these
+	// anyway).
+	Instances(InstanceTags) ([]Instance, error)
+}
diff --git a/lib/dispatchcloud/cmd.go b/lib/dispatchcloud/cmd.go
new file mode 100644
index 000000000..b2bc91300
--- /dev/null
+++ b/lib/dispatchcloud/cmd.go
@@ -0,0 +1,17 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+	"git.curoverse.com/arvados.git/lib/cmd"
+	"git.curoverse.com/arvados.git/lib/service"
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+var Command cmd.Handler = service.Command(arvados.ServiceNameDispatchCloud, newHandler)
+
+func newHandler(cluster *arvados.Cluster, np *arvados.NodeProfile) service.Handler {
+	return &dispatcher{Cluster: cluster, NodeProfile: np}
+}
diff --git a/lib/dispatchcloud/container_queue.go b/lib/dispatchcloud/container_queue.go
new file mode 100644
index 000000000..f8c0512b6
--- /dev/null
+++ b/lib/dispatchcloud/container_queue.go
@@ -0,0 +1,201 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+	"sync"
+
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+type containerQueue struct {
+	Logger  logger
+	Cluster *arvados.Cluster
+	Client  *arvados.Client
+
+	current   map[string]*arvados.Container
+	mtx       sync.Mutex
+	keeplocal map[string]struct{}
+}
+
+func (cq *containerQueue) Forget(uuid string) {
+	cq.mtx.Lock()
+	defer cq.mtx.Unlock()
+	ctr := cq.current[uuid]
+	if ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled {
+		delete(cq.current, uuid)
+	}
+}
+
+func (cq *containerQueue) Get(uuid) (arvados.Container, bool) {
+	cq.mtx.Lock()
+	defer cq.mtx.Unlock()
+	if ctr, ok := cq.current[uuid]; !ok {
+		return arvados.Container{}, false
+	} else {
+		return *ctr
+	}
+}
+
+func (cq *containerQueue) All() map[string]arvados.Container {
+	cq.mtx.Lock()
+	defer cq.mtx.Unlock()
+	ret := make(map[string]*arvados.Container, len(cq.current))
+	for uuid, ctr := range cq.current {
+		ret[uuid] = *ctr
+	}
+	return ret
+}
+
+func (cq *containerQueue) setup() {
+	cq.current.Store(map[string]*arvados.Container{})
+}
+
+func (cq *containerQueue) update() error {
+	cq.mtx.Lock()
+	cq.keeplocal = map[string]struct{}{}
+	cq.mtx.Unlock()
+
+	defer func() { cq.mtx.Lock(); cq.keeplocal = nil; cq.mtx.Unlock() }()
+
+	next, err := cq.poll()
+	if err != nil {
+		return err
+	}
+	cq.mtx.Lock()
+	defer cq.mtx.Unlock()
+	for uuid := range cq.keeplocal {
+		next[uuid] = cq.current[uuid]
+	}
+	cq.current = next
+}
+
+func (cq *containerQueue) Lock(uuid string) error {
+	return cq.apiUpdate(uuid, "lock")
+}
+
+func (cq *containerQueue) Unlock(uuid string) error {
+	return cq.apiUpdate(uuid, "unlock")
+}
+
+func (cq *containerQueue) apiUpdate(uuid, action string) error {
+	var resp arvados.Container
+	err := cq.Client.RequestAndDecode(&resp, "POST", "arvados/v1/containers/"+uuid+"/"+action, nil, nil)
+	if err != nil {
+		return err
+	}
+
+	cq.mtx.Lock()
+	defer cq.mtx.Unlock()
+	if cq.keeplocal != nil {
+		cq.keeplocal[uuid] = struct{}{}
+	}
+	if ctr, ok := cq.current[uuid]; !ok {
+		cq.current[uuid] = &resp
+	} else {
+		ctr.State, ctr.Priority, ctr.LockedByUUID = resp.State, resp.Priority, resp.LockedByUUID
+	}
+	return nil
+}
+
+func (cq *containerQueue) poll() (map[string]*arvados.Container, error) {
+	cq.mtx.Lock()
+	size := len(cq.current)
+	cq.mtx.Unlock()
+
+	next := make(map[string]*arvados.Container, size)
+	apply := func(updates []arvados.Container) {
+		for _, upd := range updates {
+			if next[ctr.UUID] == nil {
+				next[ctr.UUID] = &arvados.Container{}
+			}
+			*next[ctr.UUID] = upd
+		}
+	}
+	selectParam := []string{"uuid", "state", "priority"}
+	limitParam := 1000
+
+	mine, err := cq.fetchAll(arvados.ResourceListParams{
+		Select:  selectParam,
+		Order:   []string{"uuid"},
+		Limit:   &limitParam,
+		Count:   "none",
+		Filters: {{"locked_by_uuid", "=", cq.authUUID}},
+	})
+	if err != nil {
+		return nil, err
+	}
+	apply(mine)
+
+	avail, err := cq.fetchAll(arvados.ResourceListParams{
+		Select:  selectParam,
+		Order:   []string{"uuid"},
+		Limit:   &limitParam,
+		Count:   "none",
+		Filters: {{"state", "=", Queued}, {"priority", ">", "0"}},
+	})
+	if err != nil {
+		return err
+	}
+	apply(avail)
+
+	var missing []string
+	cq.mtx.Lock()
+	for uuid, ctr := range cq.current {
+		if next[uuid] == nil &&
+			ctr.State != arvados.ContainerStateCancelled &&
+			ctr.state != arvados.ContainerStateComplete {
+			missing = append(missing, uuid)
+		}
+	}
+	cq.mtx.Unlock()
+
+	for i, page := 0, 20; i < len(missing); i += page {
+		batch := missing[i:]
+		if len(batch) > page {
+			batch = batch[:page]
+		}
+		ended, err := cq.fetchAll(arvados.ResourceListParams{
+			Select:  selectParam,
+			Order:   []string{"uuid"},
+			Count:   "none",
+			Filters: {{"uuid", "in", batch}},
+		})
+		if err != nil {
+			return err
+		}
+		apply(ended)
+	}
+	return next, nil
+}
+
+func (cq *containerQueue) fetchAll(initialParams arvados.ResourceListParams) ([]arvados.Container, error) {
+	var results []arvados.Container
+	params := initialParams
+	params.Offset = 0
+	for {
+		// This list variable must be a new one declared
+		// inside the loop: otherwise, items in the API
+		// response would get deep-merged into the items
+		// loaded in previous iterations.
+		var list arvados.ContainerList
+
+		err := cq.Client.RequestAndDecode(&list, "GET", "arvados/v1/containers", nil, params)
+		if err != nil {
+			return nil, err
+		}
+		if len(list.Items) == 0 {
+			break
+		}
+
+		results = append(results, list.Items)
+		if len(params.Order) == 1 && params.Order[0] == "uuid" {
+			params.Filters = append(initialParams.Filters, []interface{}{"uuid", ">", list.Items[len(list.Items)-1].UUID})
+		} else {
+			params.Offset += len(list.Items)
+		}
+	}
+	return results, nil
+}
diff --git a/lib/dispatchcloud/dispatcher.go b/lib/dispatchcloud/dispatcher.go
new file mode 100644
index 000000000..90e2c799f
--- /dev/null
+++ b/lib/dispatchcloud/dispatcher.go
@@ -0,0 +1,61 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+	"net/http"
+
+	"git.curoverse.com/arvados.git/lib/cloud"
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+	"github.com/Sirupsen/logrus"
+)
+
+type dispatcher struct {
+	Cluster     *arvados.Cluster
+	NodeProfile *arvados.NodeProfile
+
+	logger         logger
+	provider       cloud.Provider
+	workerPool     workerPool
+	queue          containerQueue
+	scheduler      scheduler
+	syncer         syncer
+	staleLockFixer staleLockFixer
+	httpHandler    http.Handler
+}
+
+// ServeHTTP implements service.Handler.
+func (disp *dispatcher) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+	disp.setupOnce.Do(disp.setup)
+	disp.handler.ServeHTTP(w, r)
+}
+
+// CheckHealth implements service.Handler.
+func (disp *dispatcher) CheckHealth() error {
+	disp.setupOnce.Do(disp.setup)
+	return nil
+}
+
+func (disp *dispatcher) setup() {
+	disp.logger = logrus.StandardLogger()
+	disp.provider = &providerProxy{disp.logger, newProvider(disp.Cluster)}
+	disp.workerPool = &workerPool{disp.logger, disp.provider}
+	disp.queue = &containerQueue{disp.logger, disp.Cluster}
+	disp.scheduler = &scheduler{disp.logger, disp.queue, disp.workerPool}
+	disp.syncer = &syncer{disp.logger, disp.queue, disp.workerPool}
+	disp.staleLockFixer = &staleLockFixer{disp.logger, disp.queue, disp.workerPool}
+
+	go func() {
+		disp.workerPool.Start()
+		// staleLockFixer must be ready before scheduler can start
+		disp.staleLockFixer.Wait()
+		go disp.scheduler.Run()
+		go disp.syncer.Run()
+	}()
+
+	mux := http.NewServeMux()
+	mux.Handle("/status.json", disp.serveStatusJSON)
+	disp.httpHandler = mux
+}
diff --git a/lib/dispatchcloud/logger.go b/lib/dispatchcloud/logger.go
new file mode 100644
index 000000000..90bb6ca68
--- /dev/null
+++ b/lib/dispatchcloud/logger.go
@@ -0,0 +1,29 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+	"sync"
+	"time"
+)
+
+type logger interface {
+	Printf(string, ...interface{})
+	Warnf(string, ...interface{})
+	Debugf(string, ...interface{})
+}
+
+var nextSpam = map[string]time.Time{}
+var nextSpamMtx sync.Mutex
+
+func unspam(msg string) bool {
+	nextSpamMtx.Lock()
+	defer nextSpamMtx.Unlock()
+	if nextSpam[msg].Before(time.Now()) {
+		nextSpam[msg] = time.Now().Add(time.Minute)
+		return true
+	}
+	return false
+}
diff --git a/lib/dispatchcloud/readme.go b/lib/dispatchcloud/readme.go
new file mode 100644
index 000000000..a4b005eb8
--- /dev/null
+++ b/lib/dispatchcloud/readme.go
@@ -0,0 +1,79 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+// A dispatcher comprises a container queue, a scheduler, a worker
+// pool, a cloud provider, a stale-lock fixer, and a syncer.
+// 1. Choose a provider.
+// 2. Start a worker pool.
+// 3. Start a container queue.
+// 4. Run a stale-lock fixer.
+// 5. Start a scheduler.
+// 6. Start a syncer.
+//
+//
+// A provider (cloud driver) creates new cloud VM instances and gets
+// the latest list of instances. The returned instances implement
+// proxies to the provider's metadata and control interfaces (get IP
+// address, update tags, shutdown).
+//
+//
+// A workerPool tracks workers' instance types and readiness states
+// (available to do work now, booting, suffering a temporary network
+// outage, shutting down). It loads internal state from the cloud
+// provider's list of instances at startup, and syncs periodically
+// after that.
+//
+//
+// A worker maintains a multiplexed SSH connection to a cloud
+// instance, retrying/reconnecting as needed, so the workerPool can
+// execute commands. It asks the provider's instance to verify its SSH
+// public key once when first connecting, and again later if the key
+// changes.
+//
+//
+// A container queue tracks the known state (according to
+// arvados-controller) of each container of interest -- i.e., queued,
+// or locked/running using our own dispatch token. It also proxies the
+// dispatcher's lock/unlock/cancel requests to the controller. It
+// handles concurrent refresh and update operations without exposing
+// out-of-order updates to its callers. (It drops any new information
+// that might have originated before its own most recent
+// lock/unlock/cancel operation.)
+//
+//
+// A stale-lock fixer waits for any already-locked containers (i.e.,
+// locked by a prior server process) to appear on workers as the
+// worker pool recovers its state. It unlocks/requeues any that still
+// remain when all workers are recovered or shutdown, or its timer
+// expires.
+//
+//
+// A scheduler chooses which containers to assign to which idle
+// workers, and decides what to do when there are not enough idle
+// workers (including shutting down some idle nodes).
+//
+//
+// A syncer updates state to Cancelled when a running container
+// process dies without finalizing its entry in the controller
+// database. It also calls the worker pool to kill containers that
+// have priority=0 while locked or running.
+//
+//
+// A provider proxy wraps a provider with rate-limiting logic. After
+// the wrapped provider receives a cloud.RateLimitError, the proxy
+// starts returning errors to callers immediately without calling
+// through to the wrapped provider.
+//
+//
+// TBD: Bootstrapping script via SSH, too? Future version.
+//
+// TBD: drain instance, keep instance alive
+// TBD: metrics, diagnostics
+// TBD: why dispatch token currently passed to worker?
+//
+// Metrics: queue size, time job has been in queued, #idle/busy/booting nodes
+// Timing in each step, and end-to-end
+// Metrics: boot/idle/alloc time and cost
diff --git a/lib/dispatchcloud/scheduler.go b/lib/dispatchcloud/scheduler.go
new file mode 100644
index 000000000..a7dd3f244
--- /dev/null
+++ b/lib/dispatchcloud/scheduler.go
@@ -0,0 +1,155 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+	"sort"
+	"sync"
+	"time"
+
+	"git.curoverse.com/arvados.git/lib/cloud"
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+type container struct {
+	container arvados.Container
+	wantsType arvados.InstanceType
+}
+
+func (c *container) String() string {
+	return c.container.UUID
+}
+
+// A scheduler assigns queued containers to available workers, and
+// creates new workers when there aren't enough available.
+//
+// If it encounters problems creating new workers, the scheduler also
+// shuts down idle workers in case they are consuming quota.
+// Otherwise, workers are responsible for shutting themselves down
+// after the configured idle time threshold.
+type scheduler struct {
+	logger
+	containerQueue
+	workerPool
+
+	queue           []*container
+	queueMtx        sync.Mutex
+	instances       map[cloud.Instance]struct{}
+	enqueueOrUpdate chan container
+	runOnce         sync.Once
+}
+
+func (sched *scheduler) try() {
+	ctrs := sched.containerQueue.Current()
+	queue := make([]arvados.Container, 0, len(ctrs))
+	for uuid, ctr := range ctrs {
+		queue = append(queue, ctr)
+	}
+	sort.Slice(queue, func(i, j int) bool {
+		queue[i].Priority > queue[j].Priority
+	})
+	dibs := map[arvados.InstanceType]int{}
+	for _, ctr := range queue {
+		switch {
+		case ctr.State == arvados.ContainerStateQueued && ctr.Priority > 0:
+			it, err := ChooseInstanceType(sched.Cluster, &ctr)
+			if err != nil {
+				sched.logger.Warnf("cannot run %s", &ctr)
+				continue
+			}
+
+			if dibs[it] >= sched.workerPool.Pending(it) {
+				err := sched.workerPool.Create(it)
+				if err != nil {
+					if unspam(err.Error()) {
+						sched.logger.Warnf("scheduler: workerPool.Create: %s", err)
+					}
+					return
+				}
+			}
+			dibs[it]++
+			// ...
+		case ctr.State == arvados.ContainerStateLocked || ctr.State == arvados.ContainerStateRunning:
+			// ...
+		}
+	}
+}
+
+func (sched *scheduler) setup() {
+	sched.enqueueOrUpdate = make(chan container, 1)
+	go sched.run()
+}
+
+func (sched *scheduler) run() {
+	wakeup := make(chan struct{}, 1)
+	workers := map[*worker]*container{}
+	ctrs := map[string]*container{} // key is container UUID
+	timer := time.NewTimer(time.Second)
+	queue := []*container{}
+	for {
+		select {
+		case ctr := <-sched.enqueueOrUpdate:
+			// Get a newly queued container, or update
+			// priority/state.
+			if ctrs[ctr.UUID] == nil {
+				ctrs[ctr.UUID] = &ctr
+			} else {
+				ctrs[ctr.UUID].container = ctr.container
+				continue
+			}
+		case <-timer.C:
+		case <-wakeup:
+		}
+
+		queue = queue[:0]
+		for _, ctr := range ctrs {
+			if ctr.State == arvados.ContainerStateLocked {
+				queue = append(queue, ctr)
+			}
+		}
+		sort.Slice(queue, func(i, j int) bool {
+			if d := queue[i].Priority - queue[j].Priority; d != 0 {
+				return d > 0
+			} else {
+				return queue[i].UUID > queue[j].UUID
+			}
+		})
+
+		// Dispatch highest priority container to the idle
+		// worker with the shortest idle time.
+		for len(queue) > 0 {
+			select {
+			case todo[queue[0].wantsType] <- queue[0]:
+				queue = queue[1:]
+				continue
+			default:
+			}
+			break
+		}
+
+		// Compare queue to booting.
+	}
+}
+
+// DispatchFunc returns a dispatch.DispatchFunc.
+func (sched *scheduler) DispatchFunc(cluster *arvados.Cluster) func(actr arvados.Container, update <-chan arvados.Container) {
+	go sched.runOnce.Do(sched.run)
+	return func(actr arvados.Container, update <-chan arvados.Container) {
+		it, err := ChooseInstanceType(cluster, &actr)
+		if err != nil {
+			return err
+		}
+		sched.enqueueOrUpdate <- container{
+			container: actr,
+			wantsType: it,
+		}
+		for actr := range update {
+			sched.enqueueOrUpdate <- container{
+				container: actr,
+				wantsType: it,
+			}
+		}
+	}
+}
diff --git a/lib/dispatchcloud/worker_pool.go b/lib/dispatchcloud/worker_pool.go
new file mode 100644
index 000000000..bc14f8ea2
--- /dev/null
+++ b/lib/dispatchcloud/worker_pool.go
@@ -0,0 +1,419 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+	"bytes"
+	"errors"
+	"io"
+	"net"
+	"sort"
+	"strings"
+	"sync"
+	"sync/atomic"
+	"time"
+
+	"git.curoverse.com/arvados.git/lib/cloud"
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+	"github.com/Sirupsen/logrus"
+	"golang.org/x/crypto/ssh"
+)
+
+const (
+	workerStateUnknown  workerState = iota // might be running a container already
+	workerStateBooting                     // instance is booting
+	workerStateRunning                     // instance is running
+	workerStateShutdown                    // worker has stopped monitoring the instance
+
+	// TODO: configurable
+	maxPingFailTime = 10 * time.Minute
+	maxBootTime     = 20 * time.Minute
+)
+
+type workerPool struct {
+	logger   *logrus.FieldLogger
+	provider cloud.Provider
+
+	subscribers map[chan<- struct{}]bool
+	creating    map[arvados.InstanceType]int
+	workers     map[cloud.InstanceID]worker
+	loaded      bool
+	mtx         sync.RWMutex
+}
+
+func (wp *workerPool) Start() {
+	wp.setupOnce.Do(wp.setup)
+}
+
+// Subscribe returns a channel that becomes ready whenever a worker's
+// state changes.
+//
+// Example:
+//
+//	ch := wp.Subscribe()
+//	for range ch {
+//		// some worker has become available; try scheduling some work
+//		if wantStop {
+//			wp.Unsubscribe(ch)
+//			break
+//		}
+//	}
+func (wp *workerPool) Subscribe() <-chan struct{} {
+	wp.Start()
+	wp.mtx.Lock()
+	defer wp.mtx.Unlock()
+	ch := make(chan struct{}, 1)
+	wp.subscribers[ch] = true
+	return ch
+}
+
+// Unsubscribe stops sending updates to the given channel, and closes
+// it.
+func (wp *workerPool) Unsubscribe(ch <-chan struct{}) {
+	wp.Start()
+	wp.mtx.Lock()
+	defer wp.mtx.Unlock()
+	if _, ok := wp.subscribers[ch]; ok {
+		delete(wp.subscribers, ch)
+		close(ch)
+	}
+}
+
+// Pending returns the number of unallocated (booting + idle +
+// unknown) instances of the given type.
+func (wp *workerPool) Pending(it arvados.InstanceType) int {
+	wp.Start()
+	wp.mtx.RLock()
+	defer wp.mtx.RUnlock()
+	n := 0
+	for _, wkr := range wp.workers {
+		state, running := wkr.State()
+		if wkr.Type() == it && len(running) == 0 && (state == workerStateRunning || state == workerStateBooting || state == workerStateUnknown) {
+			n++
+		}
+	}
+	return n + wp.creating[it]
+}
+
+// Create a new instance with the given type, and add it to the worker
+// pool. The worker is added immediately; instance creation runs in
+// the background.
+func (wp *workerPool) Create(it arvados.InstanceType) error {
+	wp.Start()
+	wp.mtx.Lock()
+	defer wp.mtx.Unlock()
+	tags := cloud.InstanceTags{"InstanceType": it.Name}
+	wp.creating[it]++
+	go func() {
+		inst, err := wp.provider.Create(it, wp.imageID, tags, nil)
+		wp.mtx.Lock()
+		defer wp.mtx.Unlock()
+		wp.creating[it]--
+		if err != nil {
+			wp.logger.Errorf("workerPool: create instance: %s", err)
+			return
+		}
+		if wp.workers[inst.ID()] == nil {
+			wp.workers[inst.ID()] = newSSHWorker(inst, it, workerStateBooting, wp.notify)
+		}
+	}()
+	return nil
+}
+
+// Shutdown shuts down a worker with the given type, or returns false
+// if all workers with the given type are busy.
+func (wp *workerPool) Shutdown(it arvados.InstanceType) bool {
+	wp.Start()
+	wp.mtx.Lock()
+	defer wp.mtx.Unlock()
+	for _, tryState := range []workerState{workerStateBooting, workerStateRunning} {
+		for _, wkr := range wp.workers {
+			if state, running := wkr.State(); state != tryState || len(running) > 0 {
+				continue
+			}
+			if _, it := wkr.Instance(); wt != it {
+				continue
+			}
+			wkr.Shutdown()
+			return true
+		}
+	}
+	return false
+}
+
+// StartContainer starts a container on an idle worker immediately if
+// possible, otherwise returns false.
+func (wp *workerPool) StartContainer(ctr *container) bool {
+	wp.Start()
+	wp.mtx.Lock()
+	defer wp.mtx.Unlock()
+	var avail []worker
+	for _, wkr := range wp.workers {
+		if _, it := wkr.Instance(); it == ctr.wantsType && wkr.State() == workerStateRunning {
+			avail = append(avail, wkr)
+		}
+	}
+	// Prefer workers with shorter idle times
+	sort.Slice(avail, func(i, j int) {
+		return avail[i].busy.After(avail[j].busy)
+	})
+	for i, wkr := range avail {
+		if wkr.StartContainer(ctr) {
+			wp.logger.Debugf("workerPool: worker %s accepted container %s", wkr, ctr)
+			return true
+		}
+	}
+	return false
+}
+
+func (wp *workerPool) setup() {
+	wp.notify = make(chan worker, 1)
+	wp.creating = map[cloud.InstanceType]int{}
+	wp.workers = map[cloud.InstanceID]worker{}
+	wp.subscribers = map[chan<- struct{}]bool{}
+
+	go wp.handleWorkerNotify()
+	go wp.syncLoop()
+}
+
+func (wp *workerPool) handleWorkerNotify() {
+	for wkr := range wp.notify {
+		wp.mtx.RLock()
+		for ch := range wp.subscribers {
+			select {
+			case ch <- struct{}{}:
+			default:
+			}
+		}
+		wp.mtx.RUnlock()
+	}
+}
+
+func (wp *workerPool) syncLoop() {
+	var wait time.Duration
+	for {
+		wp.logger.Debugf("workerPool: wait %s", wait)
+		time.Sleep(wait)
+		wait = time.Minute
+		wp.logger.Debugf("workerPool: getting instance list")
+		threshold := time.Now()
+		instances, err := wp.provider.Instances()
+		if err != nil {
+			wp.logger.Warnf("workerPool: error getting instance list: %s", err)
+			wait = 15 * time.Second
+			continue
+		}
+		wp.sync(threshold, instances)
+	}
+}
+
+func (wp *workerPool) sync(threshold time.Time, instances []cloud.Instance) {
+	wp.mtx.Lock()
+	defer wp.mtx.Unlock()
+
+	updated := map[cloud.InstanceID]bool{}
+
+	for _, inst := range instances {
+		itTag := inst.Tags()["InstanceType"]
+		it, ok := wp.Cluster.InstanceTypes[itTag]
+		if !ok {
+			wp.logger.Debugf("workerPool: instance %s has InstanceType tag %q --- ignoring", inst, itTag)
+			continue
+		}
+		id := inst.ID()
+		if wkr := wp.workers[id]; wkr != nil {
+			wkr.instance = inst
+		} else {
+			wp.workers[id] = &worker{workerStateUnknown, inst, it}
+		}
+		updated[id] = true
+	}
+
+	for id, wkr := range wp.workers {
+		if updated[id] {
+			continue
+		}
+		inst := wkr.Instance()
+		if inst != nil && wkr.busy.Before(threshold) {
+			state, _ := wkr.State()
+			wp.logger.Infof("workerPool: instance %s disappeared, shutting down worker with state %s", inst, state)
+			wkr.Shutdown()
+			delete(wp.workers, id)
+		}
+	}
+
+	if !wp.loaded {
+		wp.loaded = true
+		wp.logger.Infof("workerPool: loaded initial set of instances (%d) from provider", len(wp.workers))
+	}
+}
+
+func (wp *workerPool) createInstanceFunc(it arvados.InstanceType) func() (cloud.Instance, error) {
+}
+
+// should be called in a new goroutine
+func (wp *workerPool) probeAndUpdate(wkr *worker) {
+	wp.mtx.Lock()
+	updated := wkr.updated
+	wp.mtx.Unlock()
+
+	booted, uuids, err := wkr.probe()
+	wp.mtx.Lock()
+	defer wp.mtx.Unlock()
+	if err != nil {
+		if wkr.state != workerStateShutdown {
+			elapsed := time.Since(wkr.probed)
+			wkr.logger.Infof("worker: instance %s not responding for %s: %s", inst, elapsed, err)
+
+			label, threshold := "", maxPingFailTime
+			if wkr.state == workerStateBooting {
+				label, threshold = "new ", maxBootTime
+			}
+			if elapsed > threshold {
+				wkr.logger.Warnf("worker: %sinstance %s unresponsive since %s; shutting down", label, inst, wkr.probed)
+				inst.Destroy()
+			}
+		}
+		return
+	}
+	wkr.probed = time.Now()
+	if len(uuids) > 0 {
+		wkr.busy = time.Now()
+	}
+	if wkr.state == workerStateShutdown {
+	} else if booted {
+		wkr.state = workerStateRunning
+	} else {
+		wkr.state = workerStateBooting
+	}
+	if starts == wkr.starts {
+		// We haven't started any new work since starting the
+		// probe, so this is the latest available information.
+		wkr.running = uuids
+	}
+	wp.notify <- wkr
+}
+
+type worker struct {
+	state       workerState
+	instance    cloud.Instance
+	instType    arvados.InstanceType
+	probed      time.Time
+	updated     time.Time
+	busy        time.Time
+	verifiedKey atomic.Value
+	client      *ssh.Client
+	clientErr   error
+	clientOnce  sync.Once
+	clientSetup chan bool
+}
+
+// Create a new SSH session. If session setup fails or the SSH client
+// hasn't been setup yet, setup a new SSH client and try again.
+func (wkr *worker) newSession() (*ssh.Session, error) {
+	try := func(create bool) (*ssh.Session, error) {
+		client, err := wkr.sshClient(create)
+		if err != nil {
+			return nil, err
+		}
+		return client.NewSession()
+	}
+	session, err := try(false)
+	if err != nil {
+		session, err = try(true)
+	}
+	return session, err
+}
+
+// Get the latest SSH client. If another goroutine is in the process
+// of setting one up, wait for it to finish and return its result (or
+// the last successfully setup client, if it fails).
+func (wkr *worker) sshClient(create bool) (*ssh.Client, error) {
+	wkr.clientOnce.Do(func() {
+		wkr.clientSetup = make(chan bool, 1)
+		wkr.clientErr = errors.New("client not yet created")
+	})
+	defer func() { <-wkr.clientSetup }()
+	select {
+	case wkr.clientSetup <- true:
+		if create {
+			client, err := wkr.setupSSHClient()
+			if err == nil || wkr.client == nil {
+				wkr.client, wkr.clientErr = client, err
+			}
+			if err != nil {
+				return nil, err
+			}
+		}
+	default:
+		// Another goroutine is doing the above case.  Wait
+		// for it to finish and return whatever it leaves in
+		// wkr.client.
+		wkr.clientSetup <- true
+	}
+	return wkr.client, wkr.clientErr
+}
+
+// Create a new SSH client.
+func (wkr *worker) setupSSHClient() (*ssh.Client, error) {
+	addr := instance.Address()
+	if addr == "" {
+		return nil, errors.New("instance has no address")
+	}
+	var receivedKey ssh.PublicKey
+	client, err := ssh.Dial("tcp", addr, &ssh.ClientConfig{
+		User: "root",
+		Auth: []ssh.AuthMethod{
+			ssh.Password("1234"),
+		},
+		HostKeyCallback: func(hostname string, remote net.Addr, key ssh.PublicKey) error {
+			receivedKey = key
+			return nil
+		},
+		Timeout: time.Minute,
+	})
+	if err != nil {
+		return nil, err
+	} else if key == nil {
+		return nil, errors.New("BUG: key was never provided to HostKeyCallback")
+	}
+
+	existingKey, _ := wkr.publicKey.Load().(ssh.PublicKey)
+	if existingKey == nil || !bytes.Equal(existingKey.Marshal(), receivedKey.Marshal()) {
+		err = wkr.instance.VerifyPublicKey(receivedKey, client.Dial)
+		if err != nil {
+			return nil, err
+		}
+		wkr.publicKey.Store(receivedKey)
+	}
+	return client, nil
+}
+
+func (wkr *worker) execute(cmd string, stdin io.Reader) ([]byte, []byte, error) {
+	session, err := wkr.newSession()
+	if err != nil {
+		return nil, nil, err
+	}
+	defer session.Close()
+	var stdout, stderr bytes.Buffer
+	session.Stdout = stdout
+	session.Stderr = stderr
+	if stdin != nil {
+		session.Stdin = stdin
+	}
+	sc.logger.Debugf("ssh: %s: running command: %s", sc.host, cmd)
+	err = session.Run(cmd)
+	return stdout.Bytes(), stderr.Bytes(), err
+}
+
+func (wkr *worke) probe() (booted bool, running []string, err error) {
+	stdout, stderr, err := wkr.execute("crunch-run --probe", nil)
+	if err != nil {
+		return
+	}
+	booted = true
+	running = strings.Split(string(bytes.TrimRight("\n")), "\n")
+	return
+}
diff --git a/sdk/go/arvados/container.go b/sdk/go/arvados/container.go
index 210ed9981..ef3547048 100644
--- a/sdk/go/arvados/container.go
+++ b/sdk/go/arvados/container.go
@@ -18,7 +18,7 @@ type Container struct {
 	Mounts               map[string]Mount     `json:"mounts"`
 	Output               string               `json:"output"`
 	OutputPath           string               `json:"output_path"`
-	Priority             int                  `json:"priority"`
+	Priority             int64                `json:"priority"`
 	RuntimeConstraints   RuntimeConstraints   `json:"runtime_constraints"`
 	State                ContainerState       `json:"state"`
 	SchedulingParameters SchedulingParameters `json:"scheduling_parameters"`

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list