[ARVADOS] updated: 1.2.0-9-g7fef859a2

Git user git at public.curoverse.com
Thu Aug 23 16:47:53 EDT 2018


Summary of changes:
 build/run-tests.sh                                 |   1 +
 lib/cloud/interfaces.go                            |   7 +-
 .../{worker_pool.go => worker/pool.go}             | 180 ++++++++++++++-------
 lib/dispatchcloud/worker/pool_test.go              |  49 ++++++
 4 files changed, 175 insertions(+), 62 deletions(-)
 rename lib/dispatchcloud/{worker_pool.go => worker/pool.go} (65%)
 create mode 100644 lib/dispatchcloud/worker/pool_test.go

  discards  72c5f4651e75f0833f133550245f68405a7ad161 (commit)
       via  7fef859a22f5c27a4e5e08a9f7c9559dc5daca3d (commit)

This update added new revisions after undoing existing revisions.  That is
to say, the old revision is not a strict subset of the new revision.  This
situation occurs when you --force push a change and generate a repository
containing something like this:

 * -- * -- B -- O -- O -- O (72c5f4651e75f0833f133550245f68405a7ad161)
            \
             N -- N -- N (7fef859a22f5c27a4e5e08a9f7c9559dc5daca3d)

When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 7fef859a22f5c27a4e5e08a9f7c9559dc5daca3d
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Thu Aug 23 16:47:45 2018 -0400

    13964: sketch
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/build/run-build-packages.sh b/build/run-build-packages.sh
index caebac013..11588913e 100755
--- a/build/run-build-packages.sh
+++ b/build/run-build-packages.sh
@@ -295,6 +295,8 @@ package_go_binary cmd/arvados-server arvados-server \
     "Arvados server daemons"
 package_go_binary cmd/arvados-server arvados-controller \
     "Arvados cluster controller daemon"
+package_go_binary cmd/arvados-server arvados-dispatch-cloud \
+    "Arvados cluster cloud dispatch"
 package_go_binary sdk/go/crunchrunner crunchrunner \
     "Crunchrunner executes a command inside a container and uploads the output"
 package_go_binary services/arv-git-httpd arvados-git-httpd \
diff --git a/build/run-tests.sh b/build/run-tests.sh
index e669e326c..93d7834a6 100755
--- a/build/run-tests.sh
+++ b/build/run-tests.sh
@@ -77,6 +77,7 @@ lib/cmd
 lib/controller
 lib/crunchstat
 lib/dispatchcloud
+lib/dispatchcloud/worker
 services/api
 services/arv-git-httpd
 services/crunchstat
diff --git a/cmd/arvados-server/arvados-dispatch-cloud.service b/cmd/arvados-server/arvados-dispatch-cloud.service
new file mode 100644
index 000000000..5ea5d45e7
--- /dev/null
+++ b/cmd/arvados-server/arvados-dispatch-cloud.service
@@ -0,0 +1,28 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+[Unit]
+Description=Arvados cloud dispatch
+Documentation=https://doc.arvados.org/
+After=network.target
+AssertPathExists=/etc/arvados/config.yml
+
+# systemd==229 (ubuntu:xenial) obeys StartLimitInterval in the [Unit] section
+StartLimitInterval=0
+
+# systemd>=230 (debian:9) obeys StartLimitIntervalSec in the [Unit] section
+StartLimitIntervalSec=0
+
+[Service]
+Type=notify
+EnvironmentFile=-/etc/arvados/environment
+ExecStart=/usr/bin/arvados-dispatch-cloud
+Restart=always
+RestartSec=1
+
+# systemd<=219 (centos:7, debian:8, ubuntu:trusty) obeys StartLimitInterval in the [Service] section
+StartLimitInterval=0
+
+[Install]
+WantedBy=multi-user.target
diff --git a/cmd/arvados-server/cmd.go b/cmd/arvados-server/cmd.go
index 1af3745df..cd15d25dd 100644
--- a/cmd/arvados-server/cmd.go
+++ b/cmd/arvados-server/cmd.go
@@ -9,6 +9,7 @@ import (
 
 	"git.curoverse.com/arvados.git/lib/cmd"
 	"git.curoverse.com/arvados.git/lib/controller"
+	"git.curoverse.com/arvados.git/lib/dispatchcloud"
 )
 
 var (
@@ -18,7 +19,8 @@ var (
 		"-version":  cmd.Version(version),
 		"--version": cmd.Version(version),
 
-		"controller": controller.Command,
+		"controller":     controller.Command,
+		"dispatch-cloud": dispatchcloud.Command,
 	})
 )
 
diff --git a/lib/cloud/interfaces.go b/lib/cloud/interfaces.go
new file mode 100644
index 000000000..33b9e7fa9
--- /dev/null
+++ b/lib/cloud/interfaces.go
@@ -0,0 +1,90 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package cloud
+
+import (
+	"time"
+
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+	"golang.org/x/crypto/ssh"
+)
+
+// A RateLimitError should be returned by a Provider when the cloud
+// service indicates it is rejecting all API calls for some time
+// interval.
+type RateLimitError interface {
+	// Time before which the caller should expect requests to
+	// fail.
+	EarliestRetry() time.Time
+	error
+}
+
+// A QuotaError should be returned by a Provider when the cloud
+// service indicates the account cannot create more VMs than already
+// exist.
+type QuotaError interface {
+	// If true, don't create more instances until some existing
+	// instances are destroyed. If false, don't handle the error
+	// as a quota error.
+	IsQuotaError() bool
+	error
+}
+
+type InstanceTags map[string]string
+type InstanceID string
+type ImageID string
+
+// Instance is implemented by the provider-specific instance types.
+type Instance interface {
+	// ID returns the provider's instance ID. It must be stable
+	// for the life of the instance.
+	ID() InstanceID
+
+	// String typically returns the cloud-provided instance ID.
+	String() string
+
+	// Cloud provider's "instance type" ID. Matches a ProviderType
+	// in the cluster's InstanceTypes configuration.
+	ProviderType() string
+
+	// Get current tags
+	Tags() InstanceTags
+
+	// Replace tags with the given tags
+	SetTags(InstanceTags) error
+
+	// Shut down the node
+	Destroy() error
+
+	// SSH server hostname or IP address, or empty string if
+	// unknown while instance is booting.
+	Address() string
+
+	// Return nil if the given public key matches the instance's
+	// SSH server key. If the provided Dialer is not nil,
+	// VerifyPublicKey can use it to make outgoing network
+	// connections from the instance -- e.g., to use the cloud's
+	// "this instance's metadata" API.
+	VerifyPublicKey(ssh.PublicKey, *ssh.Client) error
+}
+
+type Provider interface {
+	// Create a new instance. If supported by the driver, add the
+	// provided public key to /root/.ssh/authorized_keys.
+	//
+	// The returned error should implement RateLimitError and
+	// QuotaError where applicable.
+	Create(arvados.InstanceType, ImageID, InstanceTags, ssh.PublicKey) (Instance, error)
+
+	// Return all instances, including ones that are booting or
+	// shutting down. Optionally, filter out nodes that don't have
+	// all of the given InstanceTags (the caller will ignore these
+	// anyway).
+	Instances(InstanceTags) ([]Instance, error)
+
+	Stop()
+}
+
+type ProviderFactory func(instancePoolID string) Provider
diff --git a/lib/dispatchcloud/cmd.go b/lib/dispatchcloud/cmd.go
new file mode 100644
index 000000000..b2bc91300
--- /dev/null
+++ b/lib/dispatchcloud/cmd.go
@@ -0,0 +1,17 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+	"git.curoverse.com/arvados.git/lib/cmd"
+	"git.curoverse.com/arvados.git/lib/service"
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+var Command cmd.Handler = service.Command(arvados.ServiceNameDispatchCloud, newHandler)
+
+func newHandler(cluster *arvados.Cluster, np *arvados.NodeProfile) service.Handler {
+	return &dispatcher{Cluster: cluster, NodeProfile: np}
+}
diff --git a/lib/dispatchcloud/container_queue.go b/lib/dispatchcloud/container_queue.go
new file mode 100644
index 000000000..f8c0512b6
--- /dev/null
+++ b/lib/dispatchcloud/container_queue.go
@@ -0,0 +1,201 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+	"sync"
+
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+type containerQueue struct {
+	Logger  logger
+	Cluster *arvados.Cluster
+	Client  *arvados.Client
+
+	current   map[string]*arvados.Container
+	mtx       sync.Mutex
+	keeplocal map[string]struct{}
+}
+
+func (cq *containerQueue) Forget(uuid string) {
+	cq.mtx.Lock()
+	defer cq.mtx.Unlock()
+	ctr := cq.current[uuid]
+	if ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled {
+		delete(cq.current, uuid)
+	}
+}
+
+func (cq *containerQueue) Get(uuid) (arvados.Container, bool) {
+	cq.mtx.Lock()
+	defer cq.mtx.Unlock()
+	if ctr, ok := cq.current[uuid]; !ok {
+		return arvados.Container{}, false
+	} else {
+		return *ctr
+	}
+}
+
+func (cq *containerQueue) All() map[string]arvados.Container {
+	cq.mtx.Lock()
+	defer cq.mtx.Unlock()
+	ret := make(map[string]*arvados.Container, len(cq.current))
+	for uuid, ctr := range cq.current {
+		ret[uuid] = *ctr
+	}
+	return ret
+}
+
+func (cq *containerQueue) setup() {
+	cq.current.Store(map[string]*arvados.Container{})
+}
+
+func (cq *containerQueue) update() error {
+	cq.mtx.Lock()
+	cq.keeplocal = map[string]struct{}{}
+	cq.mtx.Unlock()
+
+	defer func() { cq.mtx.Lock(); cq.keeplocal = nil; cq.mtx.Unlock() }()
+
+	next, err := cq.poll()
+	if err != nil {
+		return err
+	}
+	cq.mtx.Lock()
+	defer cq.mtx.Unlock()
+	for uuid := range cq.keeplocal {
+		next[uuid] = cq.current[uuid]
+	}
+	cq.current = next
+}
+
+func (cq *containerQueue) Lock(uuid string) error {
+	return cq.apiUpdate(uuid, "lock")
+}
+
+func (cq *containerQueue) Unlock(uuid string) error {
+	return cq.apiUpdate(uuid, "unlock")
+}
+
+func (cq *containerQueue) apiUpdate(uuid, action string) error {
+	var resp arvados.Container
+	err := cq.Client.RequestAndDecode(&resp, "POST", "arvados/v1/containers/"+uuid+"/"+action, nil, nil)
+	if err != nil {
+		return err
+	}
+
+	cq.mtx.Lock()
+	defer cq.mtx.Unlock()
+	if cq.keeplocal != nil {
+		cq.keeplocal[uuid] = struct{}{}
+	}
+	if ctr, ok := cq.current[uuid]; !ok {
+		cq.current[uuid] = &resp
+	} else {
+		ctr.State, ctr.Priority, ctr.LockedByUUID = resp.State, resp.Priority, resp.LockedByUUID
+	}
+	return nil
+}
+
+func (cq *containerQueue) poll() (map[string]*arvados.Container, error) {
+	cq.mtx.Lock()
+	size := len(cq.current)
+	cq.mtx.Unlock()
+
+	next := make(map[string]*arvados.Container, size)
+	apply := func(updates []arvados.Container) {
+		for _, upd := range updates {
+			if next[ctr.UUID] == nil {
+				next[ctr.UUID] = &arvados.Container{}
+			}
+			*next[ctr.UUID] = upd
+		}
+	}
+	selectParam := []string{"uuid", "state", "priority"}
+	limitParam := 1000
+
+	mine, err := cq.fetchAll(arvados.ResourceListParams{
+		Select:  selectParam,
+		Order:   []string{"uuid"},
+		Limit:   &limitParam,
+		Count:   "none",
+		Filters: {{"locked_by_uuid", "=", cq.authUUID}},
+	})
+	if err != nil {
+		return nil, err
+	}
+	apply(mine)
+
+	avail, err := cq.fetchAll(arvados.ResourceListParams{
+		Select:  selectParam,
+		Order:   []string{"uuid"},
+		Limit:   &limitParam,
+		Count:   "none",
+		Filters: {{"state", "=", Queued}, {"priority", ">", "0"}},
+	})
+	if err != nil {
+		return err
+	}
+	apply(avail)
+
+	var missing []string
+	cq.mtx.Lock()
+	for uuid, ctr := range cq.current {
+		if next[uuid] == nil &&
+			ctr.State != arvados.ContainerStateCancelled &&
+			ctr.state != arvados.ContainerStateComplete {
+			missing = append(missing, uuid)
+		}
+	}
+	cq.mtx.Unlock()
+
+	for i, page := 0, 20; i < len(missing); i += page {
+		batch := missing[i:]
+		if len(batch) > page {
+			batch = batch[:page]
+		}
+		ended, err := cq.fetchAll(arvados.ResourceListParams{
+			Select:  selectParam,
+			Order:   []string{"uuid"},
+			Count:   "none",
+			Filters: {{"uuid", "in", batch}},
+		})
+		if err != nil {
+			return err
+		}
+		apply(ended)
+	}
+	return next, nil
+}
+
+func (cq *containerQueue) fetchAll(initialParams arvados.ResourceListParams) ([]arvados.Container, error) {
+	var results []arvados.Container
+	params := initialParams
+	params.Offset = 0
+	for {
+		// This list variable must be a new one declared
+		// inside the loop: otherwise, items in the API
+		// response would get deep-merged into the items
+		// loaded in previous iterations.
+		var list arvados.ContainerList
+
+		err := cq.Client.RequestAndDecode(&list, "GET", "arvados/v1/containers", nil, params)
+		if err != nil {
+			return nil, err
+		}
+		if len(list.Items) == 0 {
+			break
+		}
+
+		results = append(results, list.Items)
+		if len(params.Order) == 1 && params.Order[0] == "uuid" {
+			params.Filters = append(initialParams.Filters, []interface{}{"uuid", ">", list.Items[len(list.Items)-1].UUID})
+		} else {
+			params.Offset += len(list.Items)
+		}
+	}
+	return results, nil
+}
diff --git a/lib/dispatchcloud/dispatcher.go b/lib/dispatchcloud/dispatcher.go
new file mode 100644
index 000000000..90e2c799f
--- /dev/null
+++ b/lib/dispatchcloud/dispatcher.go
@@ -0,0 +1,61 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+	"net/http"
+
+	"git.curoverse.com/arvados.git/lib/cloud"
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+	"github.com/Sirupsen/logrus"
+)
+
+type dispatcher struct {
+	Cluster     *arvados.Cluster
+	NodeProfile *arvados.NodeProfile
+
+	logger         logger
+	provider       cloud.Provider
+	workerPool     workerPool
+	queue          containerQueue
+	scheduler      scheduler
+	syncer         syncer
+	staleLockFixer staleLockFixer
+	httpHandler    http.Handler
+}
+
+// ServeHTTP implements service.Handler.
+func (disp *dispatcher) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+	disp.setupOnce.Do(disp.setup)
+	disp.handler.ServeHTTP(w, r)
+}
+
+// CheckHealth implements service.Handler.
+func (disp *dispatcher) CheckHealth() error {
+	disp.setupOnce.Do(disp.setup)
+	return nil
+}
+
+func (disp *dispatcher) setup() {
+	disp.logger = logrus.StandardLogger()
+	disp.provider = &providerProxy{disp.logger, newProvider(disp.Cluster)}
+	disp.workerPool = &workerPool{disp.logger, disp.provider}
+	disp.queue = &containerQueue{disp.logger, disp.Cluster}
+	disp.scheduler = &scheduler{disp.logger, disp.queue, disp.workerPool}
+	disp.syncer = &syncer{disp.logger, disp.queue, disp.workerPool}
+	disp.staleLockFixer = &staleLockFixer{disp.logger, disp.queue, disp.workerPool}
+
+	go func() {
+		disp.workerPool.Start()
+		// staleLockFixer must be ready before scheduler can start
+		disp.staleLockFixer.Wait()
+		go disp.scheduler.Run()
+		go disp.syncer.Run()
+	}()
+
+	mux := http.NewServeMux()
+	mux.Handle("/status.json", disp.serveStatusJSON)
+	disp.httpHandler = mux
+}
diff --git a/lib/dispatchcloud/logger.go b/lib/dispatchcloud/logger.go
new file mode 100644
index 000000000..90bb6ca68
--- /dev/null
+++ b/lib/dispatchcloud/logger.go
@@ -0,0 +1,29 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+	"sync"
+	"time"
+)
+
+type logger interface {
+	Printf(string, ...interface{})
+	Warnf(string, ...interface{})
+	Debugf(string, ...interface{})
+}
+
+var nextSpam = map[string]time.Time{}
+var nextSpamMtx sync.Mutex
+
+func unspam(msg string) bool {
+	nextSpamMtx.Lock()
+	defer nextSpamMtx.Unlock()
+	if nextSpam[msg].Before(time.Now()) {
+		nextSpam[msg] = time.Now().Add(time.Minute)
+		return true
+	}
+	return false
+}
diff --git a/lib/dispatchcloud/readme.go b/lib/dispatchcloud/readme.go
new file mode 100644
index 000000000..a4b005eb8
--- /dev/null
+++ b/lib/dispatchcloud/readme.go
@@ -0,0 +1,79 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+// A dispatcher comprises a container queue, a scheduler, a worker
+// pool, a cloud provider, a stale-lock fixer, and a syncer.
+// 1. Choose a provider.
+// 2. Start a worker pool.
+// 3. Start a container queue.
+// 4. Run a stale-lock fixer.
+// 5. Start a scheduler.
+// 6. Start a syncer.
+//
+//
+// A provider (cloud driver) creates new cloud VM instances and gets
+// the latest list of instances. The returned instances implement
+// proxies to the provider's metadata and control interfaces (get IP
+// address, update tags, shutdown).
+//
+//
+// A workerPool tracks workers' instance types and readiness states
+// (available to do work now, booting, suffering a temporary network
+// outage, shutting down). It loads internal state from the cloud
+// provider's list of instances at startup, and syncs periodically
+// after that.
+//
+//
+// A worker maintains a multiplexed SSH connection to a cloud
+// instance, retrying/reconnecting as needed, so the workerPool can
+// execute commands. It asks the provider's instance to verify its SSH
+// public key once when first connecting, and again later if the key
+// changes.
+//
+//
+// A container queue tracks the known state (according to
+// arvados-controller) of each container of interest -- i.e., queued,
+// or locked/running using our own dispatch token. It also proxies the
+// dispatcher's lock/unlock/cancel requests to the controller. It
+// handles concurrent refresh and update operations without exposing
+// out-of-order updates to its callers. (It drops any new information
+// that might have originated before its own most recent
+// lock/unlock/cancel operation.)
+//
+//
+// A stale-lock fixer waits for any already-locked containers (i.e.,
+// locked by a prior server process) to appear on workers as the
+// worker pool recovers its state. It unlocks/requeues any that still
+// remain when all workers are recovered or shutdown, or its timer
+// expires.
+//
+//
+// A scheduler chooses which containers to assign to which idle
+// workers, and decides what to do when there are not enough idle
+// workers (including shutting down some idle nodes).
+//
+//
+// A syncer updates state to Cancelled when a running container
+// process dies without finalizing its entry in the controller
+// database. It also calls the worker pool to kill containers that
+// have priority=0 while locked or running.
+//
+//
+// A provider proxy wraps a provider with rate-limiting logic. After
+// the wrapped provider receives a cloud.RateLimitError, the proxy
+// starts returning errors to callers immediately without calling
+// through to the wrapped provider.
+//
+//
+// TBD: Bootstrapping script via SSH, too? Future version.
+//
+// TBD: drain instance, keep instance alive
+// TBD: metrics, diagnostics
+// TBD: why dispatch token currently passed to worker?
+//
+// Metrics: queue size, time job has been in queued, #idle/busy/booting nodes
+// Timing in each step, and end-to-end
+// Metrics: boot/idle/alloc time and cost
diff --git a/lib/dispatchcloud/scheduler.go b/lib/dispatchcloud/scheduler.go
new file mode 100644
index 000000000..a7dd3f244
--- /dev/null
+++ b/lib/dispatchcloud/scheduler.go
@@ -0,0 +1,155 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+	"sort"
+	"sync"
+	"time"
+
+	"git.curoverse.com/arvados.git/lib/cloud"
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+type container struct {
+	container arvados.Container
+	wantsType arvados.InstanceType
+}
+
+func (c *container) String() string {
+	return c.container.UUID
+}
+
+// A scheduler assigns queued containers to available workers, and
+// creates new workers when there aren't enough available.
+//
+// If it encounters problems creating new workers, the scheduler also
+// shuts down idle workers in case they are consuming quota.
+// Otherwise, workers are responsible for shutting themselves down
+// after the configured idle time threshold.
+type scheduler struct {
+	logger
+	containerQueue
+	workerPool
+
+	queue           []*container
+	queueMtx        sync.Mutex
+	instances       map[cloud.Instance]struct{}
+	enqueueOrUpdate chan container
+	runOnce         sync.Once
+}
+
+func (sched *scheduler) try() {
+	ctrs := sched.containerQueue.Current()
+	queue := make([]arvados.Container, 0, len(ctrs))
+	for uuid, ctr := range ctrs {
+		queue = append(queue, ctr)
+	}
+	sort.Slice(queue, func(i, j int) bool {
+		queue[i].Priority > queue[j].Priority
+	})
+	dibs := map[arvados.InstanceType]int{}
+	for _, ctr := range queue {
+		switch {
+		case ctr.State == arvados.ContainerStateQueued && ctr.Priority > 0:
+			it, err := ChooseInstanceType(sched.Cluster, &ctr)
+			if err != nil {
+				sched.logger.Warnf("cannot run %s", &ctr)
+				continue
+			}
+
+			if dibs[it] >= sched.workerPool.Pending(it) {
+				err := sched.workerPool.Create(it)
+				if err != nil {
+					if unspam(err.Error()) {
+						sched.logger.Warnf("scheduler: workerPool.Create: %s", err)
+					}
+					return
+				}
+			}
+			dibs[it]++
+			// ...
+		case ctr.State == arvados.ContainerStateLocked || ctr.State == arvados.ContainerStateRunning:
+			// ...
+		}
+	}
+}
+
+func (sched *scheduler) setup() {
+	sched.enqueueOrUpdate = make(chan container, 1)
+	go sched.run()
+}
+
+func (sched *scheduler) run() {
+	wakeup := make(chan struct{}, 1)
+	workers := map[*worker]*container{}
+	ctrs := map[string]*container{} // key is container UUID
+	timer := time.NewTimer(time.Second)
+	queue := []*container{}
+	for {
+		select {
+		case ctr := <-sched.enqueueOrUpdate:
+			// Get a newly queued container, or update
+			// priority/state.
+			if ctrs[ctr.UUID] == nil {
+				ctrs[ctr.UUID] = &ctr
+			} else {
+				ctrs[ctr.UUID].container = ctr.container
+				continue
+			}
+		case <-timer.C:
+		case <-wakeup:
+		}
+
+		queue = queue[:0]
+		for _, ctr := range ctrs {
+			if ctr.State == arvados.ContainerStateLocked {
+				queue = append(queue, ctr)
+			}
+		}
+		sort.Slice(queue, func(i, j int) bool {
+			if d := queue[i].Priority - queue[j].Priority; d != 0 {
+				return d > 0
+			} else {
+				return queue[i].UUID > queue[j].UUID
+			}
+		})
+
+		// Dispatch highest priority container to the idle
+		// worker with the shortest idle time.
+		for len(queue) > 0 {
+			select {
+			case todo[queue[0].wantsType] <- queue[0]:
+				queue = queue[1:]
+				continue
+			default:
+			}
+			break
+		}
+
+		// Compare queue to booting.
+	}
+}
+
+// DispatchFunc returns a dispatch.DispatchFunc.
+func (sched *scheduler) DispatchFunc(cluster *arvados.Cluster) func(actr arvados.Container, update <-chan arvados.Container) {
+	go sched.runOnce.Do(sched.run)
+	return func(actr arvados.Container, update <-chan arvados.Container) {
+		it, err := ChooseInstanceType(cluster, &actr)
+		if err != nil {
+			return err
+		}
+		sched.enqueueOrUpdate <- container{
+			container: actr,
+			wantsType: it,
+		}
+		for actr := range update {
+			sched.enqueueOrUpdate <- container{
+				container: actr,
+				wantsType: it,
+			}
+		}
+	}
+}
diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go
new file mode 100644
index 000000000..9e124cf52
--- /dev/null
+++ b/lib/dispatchcloud/worker/pool.go
@@ -0,0 +1,479 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package worker
+
+import (
+	"bytes"
+	"errors"
+	"io"
+	"net"
+	"sort"
+	"strings"
+	"sync"
+	"sync/atomic"
+	"time"
+
+	"git.curoverse.com/arvados.git/lib/cloud"
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+	"github.com/Sirupsen/logrus"
+	"golang.org/x/crypto/ssh"
+)
+
+type State int
+
+const (
+	StateUnknown  State = iota // might be running a container already
+	StateBooting               // instance is booting
+	StateRunning               // instance is running
+	StateShutdown              // worker has stopped monitoring the instance
+
+	// TODO: configurable
+	maxPingFailTime = 10 * time.Minute
+	maxBootTime     = 20 * time.Minute
+)
+
+var stateString = map[State]string{
+	StateUnknown:  "unknown",
+	StateBooting:  "booting",
+	StateRunning:  "running",
+	StateShutdown: "shutdown",
+}
+
+// String implements fmt.Stringer.
+func (s State) String() string {
+	return stateString[s]
+}
+
+type Pool struct {
+	Logger   logrus.FieldLogger
+	Provider cloud.Provider
+
+	subscribers map[chan<- struct{}]bool
+	creating    map[arvados.InstanceType]int // goroutines waiting for provider.Create to return
+	workers     map[cloud.InstanceID]worker
+	loaded      bool // loaded list of instances from provider at least once
+	stop        chan bool
+	mtx         sync.RWMutex
+}
+
+func (wp *Pool) Start() {
+	wp.setupOnce.Do(wp.setup)
+}
+
+// Subscribe returns a channel that becomes ready whenever a worker's
+// state changes.
+//
+// Example:
+//
+//	ch := wp.Subscribe()
+//	for range ch {
+//		// some worker has become available; try scheduling some work
+//		if wantStop {
+//			wp.Unsubscribe(ch)
+//			break
+//		}
+//	}
+func (wp *Pool) Subscribe() <-chan struct{} {
+	wp.Start()
+	wp.mtx.Lock()
+	defer wp.mtx.Unlock()
+	ch := make(chan struct{}, 1)
+	wp.subscribers[ch] = true
+	return ch
+}
+
+// Unsubscribe stops sending updates to the given channel, and closes
+// it.
+func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
+	wp.Start()
+	wp.mtx.Lock()
+	defer wp.mtx.Unlock()
+	if _, ok := wp.subscribers[ch]; ok {
+		delete(wp.subscribers, ch)
+		close(ch)
+	}
+}
+
+// Pending returns the number of unallocated (booting + idle +
+// unknown) instances of the given type.
+func (wp *Pool) Pending(it arvados.InstanceType) int {
+	wp.Start()
+	wp.mtx.RLock()
+	defer wp.mtx.RUnlock()
+	n := 0
+	for _, wkr := range wp.workers {
+		state, running := wkr.State()
+		if wkr.Type() == it && len(running) == 0 && (state == StateRunning || state == StateBooting || state == StateUnknown) {
+			n++
+		}
+	}
+	return n + wp.creating[it]
+}
+
+// Create a new instance with the given type, and add it to the worker
+// pool. The worker is added immediately; instance creation runs in
+// the background.
+func (wp *Pool) Create(it arvados.InstanceType) error {
+	wp.Start()
+	wp.mtx.Lock()
+	defer wp.mtx.Unlock()
+	tags := cloud.InstanceTags{"InstanceType": it.Name}
+	wp.creating[it]++
+	go func() {
+		inst, err := wp.Provider.Create(it, wp.imageID, tags, nil)
+		wp.mtx.Lock()
+		defer wp.mtx.Unlock()
+		wp.creating[it]--
+		if err != nil {
+			wp.Logger.Errorf("worker.Pool: create instance: %s", err)
+			return
+		}
+		if wp.workers[inst.ID()] == nil {
+			wp.workers[inst.ID()] = &worker{wp.Logger, StateBooting, inst, it}
+		}
+	}()
+	return nil
+}
+
+// Shutdown shuts down a worker with the given type, or returns false
+// if all workers with the given type are busy.
+func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
+	wp.Start()
+	wp.mtx.Lock()
+	defer wp.mtx.Unlock()
+	for _, tryState := range []State{StateBooting, StateRunning} {
+		for _, wkr := range wp.workers {
+			if state, running := wkr.State(); state != tryState || len(running) > 0 {
+				continue
+			}
+			if _, it := wkr.Instance(); wt != it {
+				continue
+			}
+			wkr.Shutdown()
+			return true
+		}
+	}
+	return false
+}
+
+// StartContainer starts a container on an idle worker immediately if
+// possible, otherwise returns false.
+func (wp *Pool) StartContainer(ctr *container) bool {
+	wp.Start()
+	wp.mtx.Lock()
+	defer wp.mtx.Unlock()
+	var avail []worker
+	for _, wkr := range wp.workers {
+		if _, it := wkr.Instance(); it == ctr.wantsType && wkr.State() == StateRunning {
+			avail = append(avail, wkr)
+		}
+	}
+	// Prefer workers with shorter idle times
+	sort.Slice(avail, func(i, j int) {
+		return avail[i].busy.After(avail[j].busy)
+	})
+	for i, wkr := range avail {
+		if wkr.StartContainer(ctr) {
+			wp.Logger.Debugf("worker.Pool: worker %s accepted container %s", wkr, ctr)
+			return true
+		}
+	}
+	return false
+}
+
+// Stop synchronizing with the provider.
+func (wp *Pool) Stop() {
+	close(wp.stop)
+}
+
+func (wp *Pool) setup() {
+	wp.notify = make(chan worker, 1)
+	wp.creating = map[cloud.InstanceType]int{}
+	wp.workers = map[cloud.InstanceID]worker{}
+	wp.subscribers = map[chan<- struct{}]bool{}
+
+	go wp.handleWorkerNotify()
+	go wp.syncLoop()
+}
+
+func (wp *Pool) handleWorkerNotify() {
+	for {
+		select {
+		case <-wp.notify:
+		case <-wp.stop:
+			return
+		}
+		wp.mtx.RLock()
+		for ch := range wp.subscribers {
+			select {
+			case ch <- struct{}{}:
+			default:
+			}
+		}
+		wp.mtx.RUnlock()
+	}
+}
+
+func (wp *Pool) syncLoop() {
+	var wait time.Duration
+	timer := time.NewTimer(time.Second)
+	for {
+		if !timer.Stop() {
+			<-timer.C
+		}
+		timer.Reset(wait)
+		wp.Logger.Debugf("worker.Pool: wait %s", wait)
+		select {
+		case <-timer.C:
+		case <-wp.stop:
+			return
+		}
+		err := wp.Sync()
+		if err != nil {
+			wp.Logger.Warnf("worker.Pool: error getting instance list: %s", err)
+			wait = 15 * time.Second
+		} else {
+			wait = time.Minute
+		}
+	}
+}
+
+func (wp *Pool) Sync() error {
+	wp.Logger.Debugf("worker.Pool: getting instance list")
+	threshold := time.Now()
+	instances, err := wp.Provider.Instances()
+	if err != nil {
+		return err
+	}
+	wp.sync(threshold, instances)
+	return nil
+}
+
+func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
+	wp.mtx.Lock()
+	defer wp.mtx.Unlock()
+
+	updated := map[cloud.InstanceID]bool{}
+
+	for _, inst := range instances {
+		itTag := inst.Tags()["InstanceType"]
+		it, ok := wp.Cluster.InstanceTypes[itTag]
+		if !ok {
+			wp.Logger.Debugf("worker.Pool: instance %s has InstanceType tag %q --- ignoring", inst, itTag)
+			continue
+		}
+		id := inst.ID()
+		if wkr := wp.workers[id]; wkr != nil {
+			wkr.instance = inst
+		} else {
+			wp.workers[id] = &worker{wp.Logger, StateUnknown, inst, it}
+		}
+		updated[id] = true
+	}
+
+	for id, wkr := range wp.workers {
+		if updated[id] {
+			continue
+		}
+		inst := wkr.Instance()
+		if inst != nil && wkr.busy.Before(threshold) {
+			state, _ := wkr.State()
+			wp.Logger.Infof("worker.Pool: instance %s disappeared, shutting down worker with state %s", inst, state)
+			wkr.Shutdown()
+			delete(wp.workers, id)
+		}
+	}
+
+	if !wp.loaded {
+		wp.loaded = true
+		wp.Logger.Infof("worker.Pool: loaded initial set of instances (%d) from provider", len(wp.workers))
+	}
+}
+
+// should be called in a new goroutine
+func (wp *Pool) probeAndUpdate(wkr *worker) {
+	wp.mtx.Lock()
+	updated := wkr.updated
+	booted := wkr.booted
+	wp.mtx.Unlock()
+
+	var err error
+	if !booted {
+		err = wkr.probeBooted(wp.BootProbeCommand)
+		if err == nil {
+			booted = true
+			wp.Logger.Infof("worker.Pool: instance %s booted", wkr.instance)
+		}
+	}
+	var running []string
+	if err == nil && booted {
+		running, err = wkr.probeRunning()
+	}
+	wp.mtx.Lock()
+	defer wp.mtx.Unlock()
+	wkr.booted = booted
+	if err != nil {
+		if wkr.state != StateShutdown {
+			elapsed := time.Since(wkr.probed)
+			wp.Logger.Infof("worker.Pool: instance %s not responding for %s: %s", inst, elapsed, err)
+
+			label, threshold := "", maxPingFailTime
+			if wkr.state == StateBooting {
+				label, threshold = "new ", maxBootTime
+			}
+			if elapsed > threshold {
+				wp.Logger.Warnf("worker.Pool: %sinstance %s unresponsive since %s; shutting down", label, inst, wkr.probed)
+				inst.Destroy()
+			}
+		}
+		return
+	}
+	wkr.probed = time.Now()
+	if len(running) > 0 {
+		wkr.busy = time.Now()
+	}
+	if wkr.state == StateShutdown {
+	} else if booted {
+		wkr.state = StateRunning
+	} else {
+		wkr.state = StateBooting
+	}
+	if starts == wkr.starts {
+		// We haven't started any new work since starting the
+		// probe, so this is the latest available information.
+		wkr.running = running
+	}
+	wp.notify <- wkr
+}
+
+type worker struct {
+	logger      *logrus.FieldLogger
+	state       State
+	instance    cloud.Instance
+	instType    arvados.InstanceType
+	probed      time.Time
+	updated     time.Time
+	busy        time.Time
+	verifiedKey atomic.Value
+	client      *ssh.Client
+	clientErr   error
+	clientOnce  sync.Once
+	clientSetup chan bool
+}
+
+// Create a new SSH session. If session setup fails or the SSH client
+// hasn't been setup yet, setup a new SSH client and try again.
+func (wkr *worker) newSession() (*ssh.Session, error) {
+	try := func(create bool) (*ssh.Session, error) {
+		client, err := wkr.sshClient(create)
+		if err != nil {
+			return nil, err
+		}
+		return client.NewSession()
+	}
+	session, err := try(false)
+	if err != nil {
+		session, err = try(true)
+	}
+	return session, err
+}
+
+// Get the latest SSH client. If another goroutine is in the process
+// of setting one up, wait for it to finish and return its result (or
+// the last successfully setup client, if it fails).
+func (wkr *worker) sshClient(create bool) (*ssh.Client, error) {
+	wkr.clientOnce.Do(func() {
+		wkr.clientSetup = make(chan bool, 1)
+		wkr.clientErr = errors.New("client not yet created")
+	})
+	defer func() { <-wkr.clientSetup }()
+	select {
+	case wkr.clientSetup <- true:
+		if create {
+			client, err := wkr.setupSSHClient()
+			if err == nil || wkr.client == nil {
+				wkr.client, wkr.clientErr = client, err
+			}
+			if err != nil {
+				return nil, err
+			}
+		}
+	default:
+		// Another goroutine is doing the above case.  Wait
+		// for it to finish and return whatever it leaves in
+		// wkr.client.
+		wkr.clientSetup <- true
+	}
+	return wkr.client, wkr.clientErr
+}
+
+// Create a new SSH client.
+func (wkr *worker) setupSSHClient() (*ssh.Client, error) {
+	addr := instance.Address()
+	if addr == "" {
+		return nil, errors.New("instance has no address")
+	}
+	var receivedKey ssh.PublicKey
+	client, err := ssh.Dial("tcp", addr, &ssh.ClientConfig{
+		User: "root",
+		Auth: []ssh.AuthMethod{
+			ssh.Password("1234"),
+		},
+		HostKeyCallback: func(hostname string, remote net.Addr, key ssh.PublicKey) error {
+			receivedKey = key
+			return nil
+		},
+		Timeout: time.Minute,
+	})
+	if err != nil {
+		return nil, err
+	} else if key == nil {
+		return nil, errors.New("BUG: key was never provided to HostKeyCallback")
+	}
+
+	existingKey, _ := wkr.publicKey.Load().(ssh.PublicKey)
+	if existingKey == nil || !bytes.Equal(existingKey.Marshal(), receivedKey.Marshal()) {
+		err = wkr.instance.VerifyPublicKey(receivedKey, client.Dial)
+		if err != nil {
+			return nil, err
+		}
+		wkr.publicKey.Store(receivedKey)
+	}
+	return client, nil
+}
+
+func (wkr *worker) execute(cmd string, stdin io.Reader) ([]byte, []byte, error) {
+	session, err := wkr.newSession()
+	if err != nil {
+		return nil, nil, err
+	}
+	defer session.Close()
+	var stdout, stderr bytes.Buffer
+	session.Stdout = stdout
+	session.Stderr = stderr
+	if stdin != nil {
+		session.Stdin = stdin
+	}
+	wkr.logger.Debugf("worker.execute: %s: running command: %s", sc.host, cmd)
+	err = session.Run(cmd)
+	return stdout.Bytes(), stderr.Bytes(), err
+}
+
+func (wkr *worker) probeRunning() (running []string, err error) {
+	stdout, stderr, err := wkr.execute("crunch-run --probe", nil)
+	if err != nil {
+		return
+	}
+	running = strings.Split(string(bytes.TrimRight("\n")), "\n")
+	return
+}
+
+func (wkr *worker) probeBooted(cmd string) error {
+	if cmd == "" {
+		cmd = "true"
+	}
+	_, _, err := wkr.execute(cmd, nil)
+	return err
+}
diff --git a/lib/dispatchcloud/worker/pool_test.go b/lib/dispatchcloud/worker/pool_test.go
new file mode 100644
index 000000000..30e9db6b5
--- /dev/null
+++ b/lib/dispatchcloud/worker/pool_test.go
@@ -0,0 +1,49 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package worker
+
+import (
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+	"github.com/Sirupsen/logrus"
+	check "gopkg.in/check.v1"
+)
+
+const GiB arvados.ByteSize = 1 << 30
+
+var _ = check.Suite(&NodeSizeSuite{})
+
+type PoolSuite struct{}
+
+func (*PoolSuite) TestCreateAvail(c *check.C) {
+	createAfter := make(chan struct{})
+	provider := &LameInstanceProvider{
+		CreateAfter: createAfter,
+	}
+	pool := &Pool{
+		Logger:   logrus.StandardLogger(),
+		Provider: provider,
+	}
+	notify := pool.Subscribe()
+	defer pool.Unsubscribe(notify)
+	type1 := arvados.InstanceType{Name: "a1s", ProviderType: "a1.small", VCPUs: 1, RAM: 1 * GiB, Price: .01}
+	type2 := arvados.InstanceType{Name: "a2m", ProviderType: "a2.medium", VCPUs: 2, RAM: 2 * GiB, Price: .01}
+	c.Check(pool.Pending(type1), check.Equals, 0)
+	c.Check(pool.Pending(type2), check.Equals, 0)
+	pool.Create(type2)
+	pool.Create(type1)
+	pool.Create(type2)
+	c.Check(pool.Pending(type1), check.Equals, 1)
+	c.Check(pool.Pending(type2), check.Equals, 2)
+	close(createAfter)
+	pool.Sync()
+	c.Check(pool.Pending(type1), check.Equals, 0)
+	c.Check(pool.Pending(type2), check.Equals, 0)
+	select {
+	case _, ok := <-notify:
+		c.Check(ok, check.Equals, true)
+	default:
+		c.Error("nothing received on notify channel")
+	}
+}
diff --git a/sdk/go/arvados/container.go b/sdk/go/arvados/container.go
index 210ed9981..ef3547048 100644
--- a/sdk/go/arvados/container.go
+++ b/sdk/go/arvados/container.go
@@ -18,7 +18,7 @@ type Container struct {
 	Mounts               map[string]Mount     `json:"mounts"`
 	Output               string               `json:"output"`
 	OutputPath           string               `json:"output_path"`
-	Priority             int                  `json:"priority"`
+	Priority             int64                `json:"priority"`
 	RuntimeConstraints   RuntimeConstraints   `json:"runtime_constraints"`
 	State                ContainerState       `json:"state"`
 	SchedulingParameters SchedulingParameters `json:"scheduling_parameters"`

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list