[ARVADOS] created: 1.2.0-9-gac25f2dac
Git user
git at public.curoverse.com
Mon Aug 20 12:14:31 EDT 2018
at ac25f2daca26e6ee97d53c7ced1bb7550fbcd63b (commit)
commit ac25f2daca26e6ee97d53c7ced1bb7550fbcd63b
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Mon Aug 20 12:14:17 2018 -0400
13964: sketch
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/build/run-build-packages.sh b/build/run-build-packages.sh
index caebac013..11588913e 100755
--- a/build/run-build-packages.sh
+++ b/build/run-build-packages.sh
@@ -295,6 +295,8 @@ package_go_binary cmd/arvados-server arvados-server \
"Arvados server daemons"
package_go_binary cmd/arvados-server arvados-controller \
"Arvados cluster controller daemon"
+package_go_binary cmd/arvados-server arvados-dispatch-cloud \
+ "Arvados cluster cloud dispatch"
package_go_binary sdk/go/crunchrunner crunchrunner \
"Crunchrunner executes a command inside a container and uploads the output"
package_go_binary services/arv-git-httpd arvados-git-httpd \
diff --git a/cmd/arvados-server/arvados-dispatch-cloud.service b/cmd/arvados-server/arvados-dispatch-cloud.service
new file mode 100644
index 000000000..5ea5d45e7
--- /dev/null
+++ b/cmd/arvados-server/arvados-dispatch-cloud.service
@@ -0,0 +1,28 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+[Unit]
+Description=Arvados cloud dispatch
+Documentation=https://doc.arvados.org/
+After=network.target
+AssertPathExists=/etc/arvados/config.yml
+
+# systemd==229 (ubuntu:xenial) obeys StartLimitInterval in the [Unit] section
+StartLimitInterval=0
+
+# systemd>=230 (debian:9) obeys StartLimitIntervalSec in the [Unit] section
+StartLimitIntervalSec=0
+
+[Service]
+Type=notify
+EnvironmentFile=-/etc/arvados/environment
+ExecStart=/usr/bin/arvados-dispatch-cloud
+Restart=always
+RestartSec=1
+
+# systemd<=219 (centos:7, debian:8, ubuntu:trusty) obeys StartLimitInterval in the [Service] section
+StartLimitInterval=0
+
+[Install]
+WantedBy=multi-user.target
diff --git a/cmd/arvados-server/cmd.go b/cmd/arvados-server/cmd.go
index 1af3745df..cd15d25dd 100644
--- a/cmd/arvados-server/cmd.go
+++ b/cmd/arvados-server/cmd.go
@@ -9,6 +9,7 @@ import (
"git.curoverse.com/arvados.git/lib/cmd"
"git.curoverse.com/arvados.git/lib/controller"
+ "git.curoverse.com/arvados.git/lib/dispatchcloud"
)
var (
@@ -18,7 +19,8 @@ var (
"-version": cmd.Version(version),
"--version": cmd.Version(version),
- "controller": controller.Command,
+ "controller": controller.Command,
+ "dispatch-cloud": dispatchcloud.Command,
})
)
diff --git a/lib/cloud/interfaces.go b/lib/cloud/interfaces.go
new file mode 100644
index 000000000..22b1df4ee
--- /dev/null
+++ b/lib/cloud/interfaces.go
@@ -0,0 +1,75 @@
+package cloud
+
+import (
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "golang.org/x/crypto/ssh"
+)
+
+// A RateLimitError should be returned by a Provider when the cloud
+// service indicates it is rejecting all API calls for some time
+// interval.
+type RateLimitError interface {
+ // Time before which the caller should expect requests to
+ // fail.
+ EarliestRetry() time.Time
+ error
+}
+
+// A QuotaError should be returned by a Provider when the cloud
+// service indicates the account cannot create more VMs than already
+// exist.
+type QuotaError interface {
+ // If true, don't create more instances until some existing
+ // instances are destroyed. If false, don't handle the error
+ // as a quota error.
+ IsQuotaError() bool
+ error
+}
+
+type InstanceTags map[string]string
+type InstanceID string
+type ImageID string
+
+// Instance is implemented by the provider-specific instance types.
+type Instance interface {
+ // ID returns the provider's instance ID. It must be stable
+ // for the life of the instance.
+ ID() InstanceID
+
+ // String typically returns the cloud-provided instance ID.
+ String() string
+
+ // Cloud provider's "instance type" ID. Matches a ProviderType
+ // in the cluster's InstanceTypes configuration.
+ ProviderType() string
+
+ // Get current tags
+ Tags() []string
+
+ // Replace tags with the given tags
+ SetTags(InstanceTags) error
+
+ // Shut down the node
+ Destroy() error
+
+ // SSH server hostname or IP address, or empty string if
+ // unknown while instance is booting.
+ Address() string
+}
+
+type Provider interface {
+ // Create a new instance. If supported by the driver, add the
+ // provide public key to /root/.ssh/authorized_keys.
+ //
+ // The returned error should implement RateLimitError and
+ // QuotaError where applicable.
+ Create(arvados.InstanceType, ImageID, InstanceTags, ssh.PublicKey) (Instance, error)
+
+ // Return all instances, including ones that are booting or
+ // shutting down. Optionally, filter out nodes that don't have
+ // all of the given InstanceTags (the caller will ignore these
+ // anyway).
+ Instances(InstanceTags) ([]Instance, error)
+}
diff --git a/lib/dispatchcloud/cmd.go b/lib/dispatchcloud/cmd.go
new file mode 100644
index 000000000..b2bc91300
--- /dev/null
+++ b/lib/dispatchcloud/cmd.go
@@ -0,0 +1,17 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+ "git.curoverse.com/arvados.git/lib/cmd"
+ "git.curoverse.com/arvados.git/lib/service"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+var Command cmd.Handler = service.Command(arvados.ServiceNameDispatchCloud, newHandler)
+
+func newHandler(cluster *arvados.Cluster, np *arvados.NodeProfile) service.Handler {
+ return &dispatcher{Cluster: cluster, NodeProfile: np}
+}
diff --git a/lib/dispatchcloud/container_queue.go b/lib/dispatchcloud/container_queue.go
new file mode 100644
index 000000000..bc3744f9e
--- /dev/null
+++ b/lib/dispatchcloud/container_queue.go
@@ -0,0 +1,157 @@
+package dispatchcloud
+
+import (
+ "sync"
+ "sync/atomic"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+type containerQueue struct {
+ logger
+ *arvados.Cluster
+ *arvados.Client
+
+ current atomic.Value
+ mtx sync.Mutex
+ keeplocal map[string]struct{}
+}
+
+func (cq *containerQueue) setup() {
+ cq.current.Store(map[string]*arvados.Container{})
+}
+
+func (cq *containerQueue) update() error {
+ cq.mtx.Lock()
+ cq.keeplocal = map[string]struct{}{}
+ cq.mtx.Unlock()
+
+ defer func() { cq.mtx.Lock(); cq.keeplocal = nil; cq.mtx.Unlock() }()
+
+ next, err := cq.poll()
+ if err != nil {
+ return err
+ }
+ cq.mtx.Lock()
+ defer cq.mtx.Unlock()
+ current := cq.current.Load().(map[string]*arvados.Container)
+ for uuid := range cq.keeplocal {
+ next[uuid] = current[uuid]
+ }
+ cq.current.Store(next)
+}
+
+func (cq *containerQueue) Lock(uuid string) error {
+ return cq.apiUpdate(uuid, "lock")
+}
+
+func (cq *containerQueue) Unlock(uuid string) error {
+ return cq.apiUpdate(uuid, "unlock")
+}
+
+func (cq *containerQueue) apiUpdate(uuid, action string) error {
+ var resp arvados.Container
+ err := cq.Client.RequestAndDecode(&resp, "POST", "arvados/v1/containers/"+uuid+"/"+action, nil, nil)
+ if err != nil {
+ return err
+ }
+
+ cq.mtx.Lock()
+ defer cq.mtx.Unlock()
+ if cq.keeplocal != nil {
+ cq.keeplocal[uuid] = struct{}{}
+ }
+ current := cq.current.Load().(map[string]*arvados.Container)
+ if ctr, ok := current[uuid]; !ok {
+ current[uuid] = &resp
+ } else {
+ ctr.State, ctr.Priority, ctr.LockedByUUID = resp.State, resp.Priority, resp.LockedByUUID
+ }
+ return nil
+}
+
+func (cq *containerQueue) poll() (map[string]*arvados.Container, error) {
+ current := cq.current.Load().(map[string]*arvados.Container)
+ next := make(map[string]*arvados.Container, len(current))
+ apply := func(updates []arvados.Container) {
+ for _, upd := range updates {
+ if next[ctr.UUID] == nil {
+ next[ctr.UUID] = &arvados.Container{}
+ }
+ *next[ctr.UUID] = upd
+ }
+ }
+ selectParam := []string{"uuid", "state", "priority"}
+ limitParam := 1000
+
+ mine, err := cq.fetchAll(arvados.ResourceListParams{
+ Select: selectParam,
+ Order: []string{"uuid"},
+ Limit: &limitParam,
+ Count: "none",
+ Filters: {{"locked_by_uuid", "=", cq.authUUID}},
+ })
+ if err != nil {
+ return nil, err
+ }
+ apply(mine)
+
+ avail, err := cq.fetchAll(arvados.ResourceListParams{
+ Select: selectParam,
+ Order: []string{"uuid"},
+ Limit: &limitParam,
+ Count: "none",
+ Filters: {{"state", "=", Queued}, {"priority", ">", "0"}},
+ })
+ if err != nil {
+ return err
+ }
+ apply(avail)
+
+ var missing []string
+ for uuid := range current {
+ if next[uuid] == nil {
+ missing = append(missing, uuid)
+ }
+ }
+ for i, page := 0, 20; i < len(missing); i += page {
+ batch := missing[i:]
+ if len(batch) > page {
+ batch = batch[:page]
+ }
+ ended, err := cq.fetchAll(arvados.ResourceListParams{
+ Select: selectParam,
+ Order: []string{"uuid"},
+ Count: "none",
+ Filters: {{"uuid", "in", batch}},
+ })
+ if err != nil {
+ return err
+ }
+ apply(ended)
+ }
+ return next, nil
+}
+
+func (cq *containerQueue) fetchAll(params arvados.ResourceListParams) ([]arvados.Container, error) {
+ var results []arvados.Container
+ params.Offset = 0
+ for {
+ // This list variable must be a new one declared
+ // inside the loop: otherwise, items in the API
+ // response would get deep-merged into the items
+ // loaded in previous iterations.
+ var list arvados.ContainerList
+
+ err := cq.Client.RequestAndDecode(&list, "GET", "arvados/v1/containers", nil, params)
+ if err != nil {
+ return nil, err
+ }
+ results = append(results, list.Items)
+ params.Offset += len(list.Items)
+ if len(list.Items) == 0 {
+ break
+ }
+ }
+ return results, nil
+}
diff --git a/lib/dispatchcloud/dispatcher.go b/lib/dispatchcloud/dispatcher.go
new file mode 100644
index 000000000..c24f56531
--- /dev/null
+++ b/lib/dispatchcloud/dispatcher.go
@@ -0,0 +1,57 @@
+package dispatchcloud
+
+import (
+ "net/http"
+
+ "git.curoverse.com/arvados.git/lib/cloud"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "github.com/Sirupsen/logrus"
+)
+
+type dispatcher struct {
+ Cluster *arvados.Cluster
+ NodeProfile *arvados.NodeProfile
+
+ logger logger
+ provider cloud.Provider
+ workerPool workerPool
+ queue containerQueue
+ scheduler scheduler
+ syncer syncer
+ staleLockFixer staleLockFixer
+ httpHandler http.Handler
+}
+
+// ServeHTTP implements service.Handler.
+func (disp *dispatcher) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ disp.setupOnce.Do(disp.setup)
+ disp.handler.ServeHTTP(w, r)
+}
+
+// CheckHealth implements service.Handler.
+func (disp *dispatcher) CheckHealth() error {
+ disp.setupOnce.Do(disp.setup)
+ return nil
+}
+
+func (disp *dispatcher) setup() {
+ disp.logger = logrus.StandardLogger()
+ disp.provider = &providerProxy{disp.logger, newProvider(disp.Cluster)}
+ disp.workerPool = &workerPool{disp.logger, disp.provider}
+ disp.queue = &containerQueue{disp.logger, disp.Cluster}
+ disp.scheduler = &scheduler{disp.logger, disp.queue, disp.workerPool}
+ disp.syncer = &syncer{disp.logger, disp.queue, disp.workerPool}
+ disp.staleLockFixer = &staleLockFixer{disp.logger, disp.queue, disp.workerPool}
+
+ go func() {
+ disp.workerPool.Start()
+ // staleLockFixer must be ready before scheduler can start
+ disp.staleLockFixer.Wait()
+ go disp.scheduler.Run()
+ go disp.syncer.Run()
+ }()
+
+ mux := http.NewServeMux()
+ mux.Handle("/status.json", disp.serveStatusJSON)
+ disp.httpHandler = mux
+}
diff --git a/lib/dispatchcloud/exclusive_worker.go b/lib/dispatchcloud/exclusive_worker.go
new file mode 100644
index 000000000..44d35e257
--- /dev/null
+++ b/lib/dispatchcloud/exclusive_worker.go
@@ -0,0 +1,26 @@
+package dispatchcloud
+
+import "sync"
+
+type exclusiveWorker struct {
+ worker
+
+ firstCall sync.Once
+ ready chan struct{}
+}
+
+func (ew *exclusiveWorker) Running() []string {
+ ew.firstCall.Do(func() {
+ ei.ready = make(chan struct{})
+ go func() {
+ ew.worker.Running()
+ close(ew.ready)
+ }()
+ })
+ select {
+ case <-ew.ready:
+ return ew.worker.Running()
+ default:
+ return nil
+ }
+}
diff --git a/lib/dispatchcloud/logger.go b/lib/dispatchcloud/logger.go
new file mode 100644
index 000000000..0b6162b48
--- /dev/null
+++ b/lib/dispatchcloud/logger.go
@@ -0,0 +1,7 @@
+package dispatchcloud
+
+type logger interface {
+ Printf(string, ...interface{})
+ Warnf(string, ...interface{})
+ Debugf(string, ...interface{})
+}
diff --git a/lib/dispatchcloud/readme.go b/lib/dispatchcloud/readme.go
new file mode 100644
index 000000000..8f47d86b2
--- /dev/null
+++ b/lib/dispatchcloud/readme.go
@@ -0,0 +1,80 @@
+package dispatchcloud
+
+// A dispatcher comprises a container queue, a scheduler, a worker
+// pool, a cloud provider, a stale-lock fixer, and a syncer.
+// 1. Choose a provider.
+// 2. Start a worker pool.
+// 3. Start a container queue.
+// 4. Run a stale-lock fixer.
+// 5. Start a scheduler.
+// 6. Start a syncer.
+//
+//
+// A provider (cloud driver) creates new cloud VM instances and gets
+// the latest list of instances. The returned instances implement
+// proxies to the provider's metadata and control interfaces (get IP
+// address, update tags, shutdown).
+//
+//
+// A commander sets up an SSH connection to an instance, and
+// retries/reconnects when needed, so it is ready to execute remote
+// commands on behalf of callers.
+//
+//
+// A workerPool tracks workers' instance types and readiness states
+// (available to do work now, booting, suffering a temporary network
+// outage, shutting down). It loads internal state from the cloud
+// provider's list of instances at startup, and syncs periodically
+// after that.
+//
+//
+// A worker manages communication with a cloud instance, and notifies
+// the worker pool (using its Wait() method) when it becomes available
+// to do work after booting, finishing a container, emerging from a
+// network outage, etc.
+//
+//
+// An exclusiveWorker wraps a worker with an assurance that its
+// instance isn't running any containers started by another worker
+// pool. (Running() returns nil until the first call to the wrapped
+// worker's Running() finishes.)
+//
+//
+// A container queue tracks the known state (according to
+// arvados-controller) of each container of interest -- i.e., queued,
+// or locked/running using our own dispatch token. It also proxies the
+// dispatcher's lock/unlock/cancel requests to the controller. It
+// handles concurrent refresh and update operations without exposing
+// out-of-order updates to its callers. (It drops any new information
+// that might have originated before its own most recent
+// lock/unlock/cancel operation.)
+//
+//
+// A stale-lock fixer waits for any already-locked containers (i.e.,
+// locked by a prior server process) to appear on workers as the
+// worker pool recovers its state. It unlocks/requeues any that still
+// remain when all workers are recovered or shutdown, or its timer
+// expires.
+//
+//
+// A scheduler chooses which containers to assign to which idle
+// workers, and decides what to do when there are not enough idle
+// workers (including shutting down some idle nodes).
+//
+//
+// A syncer updates state to Cancelled when a running container
+// process dies without finalizing its entry in the controller
+// database. It also calls the worker pool to kill containers that
+// have priority=0 while locked or running.
+//
+//
+// A provider proxy wraps a provider with rate-limiting logic. After
+// the wrapped provider receives a cloud.RateLimitError, the proxy
+// starts returning errors to callers immediately without calling
+// through to the wrapped provider.
+//
+//
+// TBD: Bootstrapping script via SSH, too? Future version.
+//
+// TBD: drain instance, keep instance alive
+// TBD: metrics, diagnostics
diff --git a/lib/dispatchcloud/scheduler.go b/lib/dispatchcloud/scheduler.go
new file mode 100644
index 000000000..795b5a127
--- /dev/null
+++ b/lib/dispatchcloud/scheduler.go
@@ -0,0 +1,113 @@
+package dispatchcloud
+
+import (
+ "sort"
+ "sync"
+ "time"
+
+ "git.curoverse.com/arvados.git/lib/cloud"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+type container struct {
+ container arvados.Container
+ wantsType arvados.InstanceType
+ worker worker // nil while queued
+ runningPID uint // 0 if crunch-run supervisor process is not running
+}
+
+// A scheduler assigns queued containers to available workers, and
+// creates new workers when there aren't enough available.
+//
+// If it encounters problems creating new workers, the scheduler also
+// shuts down idle workers in case they are consuming quota.
+// Otherwise, workers are responsible for shutting themselves down
+// after the configured idle time threshold.
+type scheduler struct {
+ logger
+ containerQueue
+ workerPool
+
+ queue []*container
+ queueMtx sync.Mutex
+ instances map[cloud.Instance]struct{}
+ enqueueOrUpdate chan container
+ runOnce sync.Once
+}
+
+func (sched *scheduler) setup() {
+ sched.enqueueOrUpdate = make(chan container, 1)
+ go sched.run()
+}
+
+func (sched *scheduler) run() {
+ wakeup := make(chan struct{}, 1)
+ workers := map[*worker]*container{}
+ ctrs := map[string]*container{} // key is container UUID
+ timer := time.NewTimer(time.Second)
+ queue := []*container{}
+ for {
+ select {
+ case ctr := <-sched.enqueueOrUpdate:
+ // Get a newly queued container, or update
+ // priority/state.
+ if ctrs[ctr.UUID] == nil {
+ ctrs[ctr.UUID] = &ctr
+ } else {
+ ctrs[ctr.UUID].container = ctr.container
+ continue
+ }
+ case <-timer.C:
+ case <-wakeup:
+ }
+
+ queue = queue[:0]
+ for _, ctr := range ctrs {
+ if ctr.State == arvados.ContainerStateLocked {
+ queue = append(queue, ctr)
+ }
+ }
+ sort.Slice(queue, func(i, j int) bool {
+ if d := queue[i].Priority - queue[j].Priority; d != 0 {
+ return d > 0
+ } else {
+ return queue[i].UUID > queue[j].UUID
+ }
+ })
+
+ // Dispatch highest priority container to the idle
+ // worker with the shortest idle time.
+ for len(queue) > 0 {
+ select {
+ case todo[queue[0].wantsType] <- queue[0]:
+ queue = queue[1:]
+ continue
+ default:
+ }
+ break
+ }
+
+ // Compare queue to booting.
+ }
+}
+
+// DispatchFunc returns a dispatch.DispatchFunc.
+func (sched *scheduler) DispatchFunc(cluster *arvados.Cluster) func(actr arvados.Container, update <-chan arvados.Container) {
+ go sched.runOnce.Do(sched.run)
+ return func(actr arvados.Container, update <-chan arvados.Container) {
+ it, err := ChooseInstanceType(cluster, &actr)
+ if err != nil {
+ return err
+ }
+ sched.enqueueOrUpdate <- container{
+ container: actr,
+ wantsType: it,
+ }
+ for actr := range update {
+ sched.enqueueOrUpdate <- container{
+ container: actr,
+ wantsType: it,
+ }
+ }
+ }
+}
diff --git a/lib/dispatchcloud/ssh.go b/lib/dispatchcloud/ssh.go
new file mode 100644
index 000000000..3b8a1dc80
--- /dev/null
+++ b/lib/dispatchcloud/ssh.go
@@ -0,0 +1,127 @@
+package dispatchcloud
+
+import (
+ "bytes"
+ "errors"
+ "io"
+ "sync/atomic"
+ "time"
+
+ "golang.org/x/crypto/ssh"
+)
+
+type addressable interface {
+ String() string // string representation for logging
+ Address() string // tcp address
+}
+
+type sshCommander struct {
+ logger logger
+ host addressable
+ client atomic.Value // last *ssh.Client created
+ ready chan bool // closed if/when client gets a *ssh.Client
+ reconnect chan bool
+ closed chan bool // closed when commander is Close()d
+}
+
+func newSSHCommander(logger logger, host addressable) *sshCommander {
+ sc := &sshCommander{
+ logger: logger,
+ host: host,
+ ready: make(chan bool),
+ reconnect: make(chan bool, 1),
+ closed: make(chan bool),
+ }
+ go sc.run()
+ return sc
+}
+
+func (sc *sshCommander) RunCommand(cmd string, stdin io.Reader) ([]byte, []byte, error) {
+ select {
+ case <-sc.ready:
+ case <-sc.closed:
+ return nil, nil, errors.New("commander is closed")
+ }
+ client := sc.client.Load().(*ssh.Client)
+ session, err := client.NewSession()
+ if err != nil {
+ return nil, nil, err
+ }
+ defer session.Close()
+ var stdout, stderr bytes.Buffer
+ session.Stdout = stdout
+ session.Stderr = stderr
+ if stdin != nil {
+ session.Stdin = stdin
+ }
+ sc.logger.Debugf("ssh: %s: running command: %s", sc.host, cmd)
+ err = session.Run(cmd)
+ return stdout.Bytes(), stderr.Bytes(), err
+}
+
+func (sc *sshCommander) Reconnect() {
+ select {
+ case sc.reconnect <- true:
+ default:
+ }
+}
+
+func (sc *sshCommander) Close() {
+ // Ensure a false value gets into the reconnect channel, even
+ // if it's full/filling with true values from other
+ // goroutines.
+ for {
+ select {
+ case sc.reconnect <- false:
+ return
+ case <-sc.reconnect:
+ }
+ }
+}
+
+func (sc *sshCommander) run() {
+ defer close(sc.closed)
+ var oldclient *ssh.Client
+ for {
+ select {
+ case alive := <-sc.reconnect:
+ if !alive {
+ return
+ }
+ default:
+ }
+ addr := sc.host.Address()
+ if addr == "" {
+ time.Sleep(time.Second)
+ continue
+ }
+ sc.logger.Debugf("ssh: %s: connecting to %s", sc.host, addr)
+ client, err := ssh.Dial("tcp", addr, &ssh.ClientConfig{
+ User: "root",
+ Auth: []ssh.AuthMethod{
+ ssh.Password("1234"),
+ },
+ // TODO: validate the host key
+ HostKeyCallback: ssh.InsecureIgnoreHostKey(),
+ Timeout: time.Minute,
+ })
+ if err != nil {
+ sc.logger.Warnf("ssh: %s: connect failed: %s", sc.host, err)
+ time.Sleep(10 * time.Second)
+ continue
+ }
+
+ sc.client.Store(client)
+ if oldclient == nil {
+ close(sc.ready)
+ } else {
+ oldclient.Close()
+ }
+ oldclient = client
+ sc.logger.Debugf("ssh: %s: connected to %s", sc.host, addr)
+
+ if !<-sc.reconnect {
+ return
+ }
+ }
+}
diff --git a/lib/dispatchcloud/worker.go b/lib/dispatchcloud/worker.go
new file mode 100644
index 000000000..6b202a00d
--- /dev/null
+++ b/lib/dispatchcloud/worker.go
@@ -0,0 +1,170 @@
+package dispatchcloud
+
+import (
+ "errors"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "git.curoverse.com/arvados.git/lib/cloud"
+)
+
+// A worker wraps a cloud.Instance. It manages the instance's life
+// cycle (boot, monitor, shut down) and handles all communication
+// involved in starting and stopping containers.
+type worker interface {
+ Running() []string
+ Wait() bool
+ StartContainer(ctr *container) bool
+}
+
+// An sshWorker uses a cloud instance
+type sshWorker struct {
+ starts uint64
+ instance cloud.Instance
+ ready bool
+ readyCond sync.Cond
+ running []string
+ done chan struct{} // closed if worker has terminated
+ err error
+}
+
+// Wait returns true when the worker is ready to start a new
+// container, or false if the worker terminates before becoming ready.
+func (wkr *sshWorker) Wait() bool {
+ wkr.readyCond.L.Lock()
+ defer wkr.readyCond.L.Unlock()
+ for {
+ wkr.readyCond.Wait()
+ if wkr.ready {
+ return true
+ }
+ select {
+ case <-wkr.done:
+ return false
+ default:
+ continue
+ }
+ }
+}
+
+// Running returns the UUIDs of the containers currently running on
+// the instance.
+//
+// Running blocks until the instance asserts the VM is new (has never
+// started a container), the worker's first probe succeeds, or the
+// worker shuts down. After that, it returns instantaneously.
+//
+// A workerPool should use this to discover containers that are still
+// running after being abandoned by a previous server instantiation.
+func (wkr *sshWorker) Running() []string {
+ wkr.readyCond.L.Lock()
+ defer wkr.readyCond.L.Unlock()
+ for !wkr.ready {
+ wkr.readyCond.Wait()
+ }
+ return wkr.running
+}
+
+// StartContainer starts the given container on the worker and returns
+// true.
+//
+// If the container cannot be started now (typically because a failed
+// probe made it non-ready since the scheduler's last call to Wait()),
+// it returns false.
+func (wkr *sshWorker) StartContainer(ctr *container) bool {
+ wkr.readyCond.L.Lock()
+ defer wkr.readyCond.L.Unlock()
+ if !wkr.ready {
+ return false
+ }
+ wkr.ready = false
+ wkr.running = append(wkr.running, ctr.container.UUID)
+ atomic.AddUint64(&wkr.starts, 1)
+}
+
+// newSSHWorker creates a new sshWorker.
+//
+// In a new goroutine, the worker calls getInstance to get its backing
+// instance.
+//
+// Typically getInstance will either call (cloud.Provider)Create() or
+// return an existing instance that doesn't already belong to another
+// worker.
+func newSSHWorker(getInstance func() (cloud.Instance, error)) *sshWorker {
+ wkr := &sshWorker{
+ readyCond: sync.Cond{L: sync.Mutex{}},
+ done: make(chan struct{}),
+ }
+ wkr.todo.Store((chan *container)(nil))
+ go func() {
+ inst, err := getInstance()
+ if err != nil {
+ wkr.err = err
+ close(wkr.done)
+ return
+ }
+ wkr.instance = inst
+ wkr.run()
+ }()
+ return wkr
+}
+
+// probeAndNotify calls probe() and takes any appropriate actions
+// based on the result.
+//
+// Possible actions include waking up goroutines in Wait(), printing
+// logs, and notifying the scheduler that a container is running.
+func (wkr *sshWorker) probeAndUpdate() {
+ starts := atomic.LoadUint64(&wkr.starts)
+ booted, uuids, err := wkr.probe(time.Minute / 2)
+ if err != nil {
+ wkr.readyCond.L.Lock()
+ wkr.ready = false
+ wkr.readyCond.L.Unlock()
+ wkr.logger.Infof("instance %s not responding for %s: %s", inst, time.Since(lastPing), err)
+ if time.Since(lastPing) > maxPingFailTime {
+ wkr.logger.Warnf("instance %s unresponsive since %s, shutting down", inst, lastPing)
+ err = inst.Destroy()
+ if err == nil {
+ cancel()
+ }
+ }
+ return
+ }
+ if !booted {
+ return
+ }
+ wkr.readyCond.L.Lock()
+ defer wkr.readyCond.L.Unlock()
+ if starts != wkr.starts {
+ // List of UUIDs is stale.
+ return
+ }
+ wkr.ready = true
+ wkr.running = uuids
+ wkr.readyCond.Broadcast()
+}
+
+var errWorkerTerminated = errors.New("worker terminated")
+
+func (wkr *sshWorker) run(inst cloud.Instance) {
+ defer func() { wkr.ready = true; wkr.readyCond.Broadcast() }()
+ defer close(wkr.done)
+
+ lastPing := time.Now()
+ maxPingFailTime := 10 * time.Minute
+ ticker := time.NewTicker(time.Minute)
+ defer ticker.Stop()
+
+ go wkr.probe(todo)
+ for {
+ var ok bool
+ select {
+ case <-ticker.C:
+ go wkr.probe(todo)
+ case ctr := <-todo:
+ // send "run container" command
+ }
+ }
+}
diff --git a/lib/dispatchcloud/worker_pool.go b/lib/dispatchcloud/worker_pool.go
new file mode 100644
index 000000000..23703491e
--- /dev/null
+++ b/lib/dispatchcloud/worker_pool.go
@@ -0,0 +1,138 @@
+package dispatchcloud
+
+import (
+ "io"
+ "sync"
+ "time"
+
+ "git.curoverse.com/arvados.git/lib/cloud"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "github.com/Sirupsen/logrus"
+)
+
+type commander interface {
+ RunCommand(cmd string, stdin io.Reader) (stdout []byte, stderr []byte, err error)
+ Reconnect()
+ Close()
+}
+
+type workerPool struct {
+ logger *logrus.FieldLogger
+ provider cloud.Provider
+
+ workers map[cloud.Instance]worker
+ available map[arvados.InstanceType][]worker
+ plan map[*worker]*container
+ booted chan struct{}
+ mtx sync.RWMutex
+}
+
+func (wp *workerPool) setup() {
+ wp.workers = map[cloud.Instance]worker{}
+ wp.available = map[arvados.InstanceType][]worker{}
+ wp.plan = map[*worker]*container{}
+ wp.booted = make(chan struct{})
+
+ go wp.loadInstances()
+}
+
+func (wp *workerPool) loadInstances() {
+ wp.logger.Infof("worker pool: bootstrapping")
+
+ var instances []cloud.Instance
+ var err error
+ for {
+ instances, err = wp.provider.Instances()
+ if err == nil {
+ break
+ }
+ wait := 15 * time.Second
+ wp.logger.Warnf("worker pool: error getting instance list (retry in %s): %s", wait, err)
+ time.Sleep(wait)
+ }
+
+ var wg sync.WaitGroup
+ for _, inst := range instances {
+ inst := inst
+ itype := taggedInstanceType(inst)
+ worker := newSSHWorker(func() (cloud.Instance, error) { return inst, nil })
+ if wp.workers[itype] == nil {
+ wp.workers[itype] = map[worker]bool{}
+ }
+ wp.workers[itype][worker] = false
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ worker.Running()
+ wp.mtx.Lock()
+ if wp.workers[itype] == nil {
+ wp.workers[itype] = map[worker]bool{}
+ }
+ wp.workers[itype][worker] = true
+ wp.mtx.Unlock()
+ }()
+ }
+ go func() {
+ wg.Wait()
+ wp.logger.Infof("worker pool: bootstrapping done")
+ close(wp.booted)
+ }()
+
+ wait := time.Minute
+ for {
+ wp.logger.Debugf("workerPool: wait %s", wait)
+ time.Sleep(wait)
+ wp.logger.Debugf("workerPool: getting instance list")
+ instances, err := wp.provider.Instances()
+ if err != nil {
+ wp.logger.Warnf("workerPool: error getting instance list: %s", err)
+ continue
+ }
+ for _, inst := range instances {
+ wp.mtx.Lock()
+ if wp.workers[itype] == nil {
+ wp.workers[itype] = map[worker]bool{}
+ }
+ wp.workers[itype][worker] = true
+ wp.mtx.Unlock()
+ }
+ }
+}
+
+func (wp *workerPool) Start() {
+ wp.setupOnce.Do(wp.setup)
+}
+
+// Available returns the number of non-allocated (i.e., booting or
+// idle) instances of the given type.
+func (wp *workerPool) Available(it arvados.InstanceType) int {
+ wp.Start()
+ wp.mtx.RLock()
+ defer wp.mtx.RUnlock()
+ return len(wp.workers[it])
+}
+
+// Create creates a new instance with the given type, and adds it to
+// the worker pool.
+func (wp *workerPool) Create(it arvados.InstanceType) error {
+ wp.Start()
+ wp.mtx.Lock()
+ defer wp.mtx.Unlock()
+ // ...
+}
+
+// Shutdown shuts down a worker with the given type, or returns false
+// if all workers with the given type are busy.
+func (wp *workerPool) Shutdown(it arvados.InstanceType) bool {
+ wp.Start()
+ wp.mtx.Lock()
+ defer wp.mtx.Unlock()
+ // ...
+}
+
+// Start starts a container on an idle node immediately if possible,
+// otherwise returns false.
+func (wp *workerPool) Start(ctr container) bool {
+ wp.Start()
+ // ...
+}
diff --git a/sdk/go/arvados/container.go b/sdk/go/arvados/container.go
index 210ed9981..ef3547048 100644
--- a/sdk/go/arvados/container.go
+++ b/sdk/go/arvados/container.go
@@ -18,7 +18,7 @@ type Container struct {
Mounts map[string]Mount `json:"mounts"`
Output string `json:"output"`
OutputPath string `json:"output_path"`
- Priority int `json:"priority"`
+ Priority int64 `json:"priority"`
RuntimeConstraints RuntimeConstraints `json:"runtime_constraints"`
State ContainerState `json:"state"`
SchedulingParameters SchedulingParameters `json:"scheduling_parameters"`
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list