[ARVADOS] created: 1.2.0-9-gac25f2dac
    Git user 
    git at public.curoverse.com
       
    Mon Aug 20 12:14:31 EDT 2018
    
    
  
        at  ac25f2daca26e6ee97d53c7ced1bb7550fbcd63b (commit)
commit ac25f2daca26e6ee97d53c7ced1bb7550fbcd63b
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Mon Aug 20 12:14:17 2018 -0400
    13964: sketch
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/build/run-build-packages.sh b/build/run-build-packages.sh
index caebac013..11588913e 100755
--- a/build/run-build-packages.sh
+++ b/build/run-build-packages.sh
@@ -295,6 +295,8 @@ package_go_binary cmd/arvados-server arvados-server \
     "Arvados server daemons"
 package_go_binary cmd/arvados-server arvados-controller \
     "Arvados cluster controller daemon"
+package_go_binary cmd/arvados-server arvados-dispatch-cloud \
+    "Arvados cluster cloud dispatch"
 package_go_binary sdk/go/crunchrunner crunchrunner \
     "Crunchrunner executes a command inside a container and uploads the output"
 package_go_binary services/arv-git-httpd arvados-git-httpd \
diff --git a/cmd/arvados-server/arvados-dispatch-cloud.service b/cmd/arvados-server/arvados-dispatch-cloud.service
new file mode 100644
index 000000000..5ea5d45e7
--- /dev/null
+++ b/cmd/arvados-server/arvados-dispatch-cloud.service
@@ -0,0 +1,28 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+[Unit]
+Description=Arvados cloud dispatch
+Documentation=https://doc.arvados.org/
+After=network.target
+AssertPathExists=/etc/arvados/config.yml
+
+# systemd==229 (ubuntu:xenial) obeys StartLimitInterval in the [Unit] section
+StartLimitInterval=0
+
+# systemd>=230 (debian:9) obeys StartLimitIntervalSec in the [Unit] section
+StartLimitIntervalSec=0
+
+[Service]
+Type=notify
+EnvironmentFile=-/etc/arvados/environment
+ExecStart=/usr/bin/arvados-dispatch-cloud
+Restart=always
+RestartSec=1
+
+# systemd<=219 (centos:7, debian:8, ubuntu:trusty) obeys StartLimitInterval in the [Service] section
+StartLimitInterval=0
+
+[Install]
+WantedBy=multi-user.target
diff --git a/cmd/arvados-server/cmd.go b/cmd/arvados-server/cmd.go
index 1af3745df..cd15d25dd 100644
--- a/cmd/arvados-server/cmd.go
+++ b/cmd/arvados-server/cmd.go
@@ -9,6 +9,7 @@ import (
 
 	"git.curoverse.com/arvados.git/lib/cmd"
 	"git.curoverse.com/arvados.git/lib/controller"
+	"git.curoverse.com/arvados.git/lib/dispatchcloud"
 )
 
 var (
@@ -18,7 +19,8 @@ var (
 		"-version":  cmd.Version(version),
 		"--version": cmd.Version(version),
 
-		"controller": controller.Command,
+		"controller":     controller.Command,
+		"dispatch-cloud": dispatchcloud.Command,
 	})
 )
 
diff --git a/lib/cloud/interfaces.go b/lib/cloud/interfaces.go
new file mode 100644
index 000000000..22b1df4ee
--- /dev/null
+++ b/lib/cloud/interfaces.go
@@ -0,0 +1,75 @@
+package cloud
+
+import (
+	"time"
+
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+	"golang.org/x/crypto/ssh"
+)
+
+// A RateLimitError should be returned by a Provider when the cloud
+// service indicates it is rejecting all API calls for some time
+// interval.
+type RateLimitError interface {
+	// Time before which the caller should expect requests to
+	// fail.
+	EarliestRetry() time.Time
+	error
+}
+
+// A QuotaError should be returned by a Provider when the cloud
+// service indicates the account cannot create more VMs than already
+// exist.
+type QuotaError interface {
+	// If true, don't create more instances until some existing
+	// instances are destroyed. If false, don't handle the error
+	// as a quota error.
+	IsQuotaError() bool
+	error
+}
+
+type InstanceTags map[string]string
+type InstanceID string
+type ImageID string
+
+// Instance is implemented by the provider-specific instance types.
+type Instance interface {
+	// ID returns the provider's instance ID. It must be stable
+	// for the life of the instance.
+	ID() InstanceID
+
+	// String typically returns the cloud-provided instance ID.
+	String() string
+
+	// Cloud provider's "instance type" ID. Matches a ProviderType
+	// in the cluster's InstanceTypes configuration.
+	ProviderType() string
+
+	// Get current tags
+	Tags() []string
+
+	// Replace tags with the given tags
+	SetTags(InstanceTags) error
+
+	// Shut down the node
+	Destroy() error
+
+	// SSH server hostname or IP address, or empty string if
+	// unknown while instance is booting.
+	Address() string
+}
+
+type Provider interface {
+	// Create a new instance. If supported by the driver, add the
+	// provide public key to /root/.ssh/authorized_keys.
+	//
+	// The returned error should implement RateLimitError and
+	// QuotaError where applicable.
+	Create(arvados.InstanceType, ImageID, InstanceTags, ssh.PublicKey) (Instance, error)
+
+	// Return all instances, including ones that are booting or
+	// shutting down. Optionally, filter out nodes that don't have
+	// all of the given InstanceTags (the caller will ignore these
+	// anyway).
+	Instances(InstanceTags) ([]Instance, error)
+}
diff --git a/lib/dispatchcloud/cmd.go b/lib/dispatchcloud/cmd.go
new file mode 100644
index 000000000..b2bc91300
--- /dev/null
+++ b/lib/dispatchcloud/cmd.go
@@ -0,0 +1,17 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+	"git.curoverse.com/arvados.git/lib/cmd"
+	"git.curoverse.com/arvados.git/lib/service"
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+var Command cmd.Handler = service.Command(arvados.ServiceNameDispatchCloud, newHandler)
+
+func newHandler(cluster *arvados.Cluster, np *arvados.NodeProfile) service.Handler {
+	return &dispatcher{Cluster: cluster, NodeProfile: np}
+}
diff --git a/lib/dispatchcloud/container_queue.go b/lib/dispatchcloud/container_queue.go
new file mode 100644
index 000000000..bc3744f9e
--- /dev/null
+++ b/lib/dispatchcloud/container_queue.go
@@ -0,0 +1,157 @@
+package dispatchcloud
+
+import (
+	"sync"
+	"sync/atomic"
+
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+type containerQueue struct {
+	logger
+	*arvados.Cluster
+	*arvados.Client
+
+	current   atomic.Value
+	mtx       sync.Mutex
+	keeplocal map[string]struct{}
+}
+
+func (cq *containerQueue) setup() {
+	cq.current.Store(map[string]*arvados.Container{})
+}
+
+func (cq *containerQueue) update() error {
+	cq.mtx.Lock()
+	cq.keeplocal = map[string]struct{}{}
+	cq.mtx.Unlock()
+
+	defer func() { cq.mtx.Lock(); cq.keeplocal = nil; cq.mtx.Unlock() }()
+
+	next, err := cq.poll()
+	if err != nil {
+		return err
+	}
+	cq.mtx.Lock()
+	defer cq.mtx.Unlock()
+	current := cq.current.Load().(map[string]*arvados.Container)
+	for uuid := range cq.keeplocal {
+		next[uuid] = current[uuid]
+	}
+	cq.current.Store(next)
+}
+
+func (cq *containerQueue) Lock(uuid string) error {
+	return cq.apiUpdate(uuid, "lock")
+}
+
+func (cq *containerQueue) Unlock(uuid string) error {
+	return cq.apiUpdate(uuid, "unlock")
+}
+
+func (cq *containerQueue) apiUpdate(uuid, action string) error {
+	var resp arvados.Container
+	err := cq.Client.RequestAndDecode(&resp, "POST", "arvados/v1/containers/"+uuid+"/"+action, nil, nil)
+	if err != nil {
+		return err
+	}
+
+	cq.mtx.Lock()
+	defer cq.mtx.Unlock()
+	if cq.keeplocal != nil {
+		cq.keeplocal[uuid] = struct{}{}
+	}
+	current := cq.current.Load().(map[string]*arvados.Container)
+	if ctr, ok := current[uuid]; !ok {
+		current[uuid] = &resp
+	} else {
+		ctr.State, ctr.Priority, ctr.LockedByUUID = resp.State, resp.Priority, resp.LockedByUUID
+	}
+	return nil
+}
+
+func (cq *containerQueue) poll() (map[string]*arvados.Container, error) {
+	current := cq.current.Load().(map[string]*arvados.Container)
+	next := make(map[string]*arvados.Container, len(current))
+	apply := func(updates []arvados.Container) {
+		for _, upd := range updates {
+			if next[ctr.UUID] == nil {
+				next[ctr.UUID] = &arvados.Container{}
+			}
+			*next[ctr.UUID] = upd
+		}
+	}
+	selectParam := []string{"uuid", "state", "priority"}
+	limitParam := 1000
+
+	mine, err := cq.fetchAll(arvados.ResourceListParams{
+		Select:  selectParam,
+		Order:   []string{"uuid"},
+		Limit:   &limitParam,
+		Count:   "none",
+		Filters: {{"locked_by_uuid", "=", cq.authUUID}},
+	})
+	if err != nil {
+		return nil, err
+	}
+	apply(mine)
+
+	avail, err := cq.fetchAll(arvados.ResourceListParams{
+		Select:  selectParam,
+		Order:   []string{"uuid"},
+		Limit:   &limitParam,
+		Count:   "none",
+		Filters: {{"state", "=", Queued}, {"priority", ">", "0"}},
+	})
+	if err != nil {
+		return err
+	}
+	apply(avail)
+
+	var missing []string
+	for uuid := range current {
+		if next[uuid] == nil {
+			missing = append(missing, uuid)
+		}
+	}
+	for i, page := 0, 20; i < len(missing); i += page {
+		batch := missing[i:]
+		if len(batch) > page {
+			batch = batch[:page]
+		}
+		ended, err := cq.fetchAll(arvados.ResourceListParams{
+			Select:  selectParam,
+			Order:   []string{"uuid"},
+			Count:   "none",
+			Filters: {{"uuid", "in", batch}},
+		})
+		if err != nil {
+			return err
+		}
+		apply(ended)
+	}
+	return next, nil
+}
+
+func (cq *containerQueue) fetchAll(params arvados.ResourceListParams) ([]arvados.Container, error) {
+	var results []arvados.Container
+	params.Offset = 0
+	for {
+		// This list variable must be a new one declared
+		// inside the loop: otherwise, items in the API
+		// response would get deep-merged into the items
+		// loaded in previous iterations.
+		var list arvados.ContainerList
+
+		err := cq.Client.RequestAndDecode(&list, "GET", "arvados/v1/containers", nil, params)
+		if err != nil {
+			return nil, err
+		}
+		results = append(results, list.Items)
+		params.Offset += len(list.Items)
+		if len(list.Items) == 0 {
+			break
+		}
+	}
+	return results, nil
+}
diff --git a/lib/dispatchcloud/dispatcher.go b/lib/dispatchcloud/dispatcher.go
new file mode 100644
index 000000000..c24f56531
--- /dev/null
+++ b/lib/dispatchcloud/dispatcher.go
@@ -0,0 +1,57 @@
+package dispatchcloud
+
+import (
+	"net/http"
+
+	"git.curoverse.com/arvados.git/lib/cloud"
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+	"github.com/Sirupsen/logrus"
+)
+
+type dispatcher struct {
+	Cluster     *arvados.Cluster
+	NodeProfile *arvados.NodeProfile
+
+	logger         logger
+	provider       cloud.Provider
+	workerPool     workerPool
+	queue          containerQueue
+	scheduler      scheduler
+	syncer         syncer
+	staleLockFixer staleLockFixer
+	httpHandler    http.Handler
+}
+
+// ServeHTTP implements service.Handler.
+func (disp *dispatcher) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+	disp.setupOnce.Do(disp.setup)
+	disp.handler.ServeHTTP(w, r)
+}
+
+// CheckHealth implements service.Handler.
+func (disp *dispatcher) CheckHealth() error {
+	disp.setupOnce.Do(disp.setup)
+	return nil
+}
+
+func (disp *dispatcher) setup() {
+	disp.logger = logrus.StandardLogger()
+	disp.provider = &providerProxy{disp.logger, newProvider(disp.Cluster)}
+	disp.workerPool = &workerPool{disp.logger, disp.provider}
+	disp.queue = &containerQueue{disp.logger, disp.Cluster}
+	disp.scheduler = &scheduler{disp.logger, disp.queue, disp.workerPool}
+	disp.syncer = &syncer{disp.logger, disp.queue, disp.workerPool}
+	disp.staleLockFixer = &staleLockFixer{disp.logger, disp.queue, disp.workerPool}
+
+	go func() {
+		disp.workerPool.Start()
+		// staleLockFixer must be ready before scheduler can start
+		disp.staleLockFixer.Wait()
+		go disp.scheduler.Run()
+		go disp.syncer.Run()
+	}()
+
+	mux := http.NewServeMux()
+	mux.Handle("/status.json", disp.serveStatusJSON)
+	disp.httpHandler = mux
+}
diff --git a/lib/dispatchcloud/exclusive_worker.go b/lib/dispatchcloud/exclusive_worker.go
new file mode 100644
index 000000000..44d35e257
--- /dev/null
+++ b/lib/dispatchcloud/exclusive_worker.go
@@ -0,0 +1,26 @@
+package dispatchcloud
+
+import "sync"
+
+type exclusiveWorker struct {
+	worker
+
+	firstCall sync.Once
+	ready     chan struct{}
+}
+
+func (ew *exclusiveWorker) Running() []string {
+	ew.firstCall.Do(func() {
+		ei.ready = make(chan struct{})
+		go func() {
+			ew.worker.Running()
+			close(ew.ready)
+		}()
+	})
+	select {
+	case <-ew.ready:
+		return ew.worker.Running()
+	default:
+		return nil
+	}
+}
diff --git a/lib/dispatchcloud/logger.go b/lib/dispatchcloud/logger.go
new file mode 100644
index 000000000..0b6162b48
--- /dev/null
+++ b/lib/dispatchcloud/logger.go
@@ -0,0 +1,7 @@
+package dispatchcloud
+
+type logger interface {
+	Printf(string, ...interface{})
+	Warnf(string, ...interface{})
+	Debugf(string, ...interface{})
+}
diff --git a/lib/dispatchcloud/readme.go b/lib/dispatchcloud/readme.go
new file mode 100644
index 000000000..8f47d86b2
--- /dev/null
+++ b/lib/dispatchcloud/readme.go
@@ -0,0 +1,80 @@
+package dispatchcloud
+
+// A dispatcher comprises a container queue, a scheduler, a worker
+// pool, a cloud provider, a stale-lock fixer, and a syncer.
+// 1. Choose a provider.
+// 2. Start a worker pool.
+// 3. Start a container queue.
+// 4. Run a stale-lock fixer.
+// 5. Start a scheduler.
+// 6. Start a syncer.
+//
+//
+// A provider (cloud driver) creates new cloud VM instances and gets
+// the latest list of instances. The returned instances implement
+// proxies to the provider's metadata and control interfaces (get IP
+// address, update tags, shutdown).
+//
+//
+// A commander sets up an SSH connection to an instance, and
+// retries/reconnects when needed, so it is ready to execute remote
+// commands on behalf of callers.
+//
+//
+// A workerPool tracks workers' instance types and readiness states
+// (available to do work now, booting, suffering a temporary network
+// outage, shutting down). It loads internal state from the cloud
+// provider's list of instances at startup, and syncs periodically
+// after that.
+//
+//
+// A worker manages communication with a cloud instance, and notifies
+// the worker pool (using its Wait() method) when it becomes available
+// to do work after booting, finishing a container, emerging from a
+// network outage, etc.
+//
+//
+// An exclusiveWorker wraps a worker with an assurance that its
+// instance isn't running any containers started by another worker
+// pool. (Running() returns nil until the first call to the wrapped
+// worker's Running() finishes.)
+//
+//
+// A container queue tracks the known state (according to
+// arvados-controller) of each container of interest -- i.e., queued,
+// or locked/running using our own dispatch token. It also proxies the
+// dispatcher's lock/unlock/cancel requests to the controller. It
+// handles concurrent refresh and update operations without exposing
+// out-of-order updates to its callers. (It drops any new information
+// that might have originated before its own most recent
+// lock/unlock/cancel operation.)
+//
+//
+// A stale-lock fixer waits for any already-locked containers (i.e.,
+// locked by a prior server process) to appear on workers as the
+// worker pool recovers its state. It unlocks/requeues any that still
+// remain when all workers are recovered or shutdown, or its timer
+// expires.
+//
+//
+// A scheduler chooses which containers to assign to which idle
+// workers, and decides what to do when there are not enough idle
+// workers (including shutting down some idle nodes).
+//
+//
+// A syncer updates state to Cancelled when a running container
+// process dies without finalizing its entry in the controller
+// database. It also calls the worker pool to kill containers that
+// have priority=0 while locked or running.
+//
+//
+// A provider proxy wraps a provider with rate-limiting logic. After
+// the wrapped provider receives a cloud.RateLimitError, the proxy
+// starts returning errors to callers immediately without calling
+// through to the wrapped provider.
+//
+//
+// TBD: Bootstrapping script via SSH, too? Future version.
+//
+// TBD: drain instance, keep instance alive
+// TBD: metrics, diagnostics
diff --git a/lib/dispatchcloud/scheduler.go b/lib/dispatchcloud/scheduler.go
new file mode 100644
index 000000000..795b5a127
--- /dev/null
+++ b/lib/dispatchcloud/scheduler.go
@@ -0,0 +1,113 @@
+package dispatchcloud
+
+import (
+	"sort"
+	"sync"
+	"time"
+
+	"git.curoverse.com/arvados.git/lib/cloud"
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+type container struct {
+	container  arvados.Container
+	wantsType  arvados.InstanceType
+	worker     worker // nil while queued
+	runningPID uint   // 0 if crunch-run supervisor process is not running
+}
+
+// A scheduler assigns queued containers to available workers, and
+// creates new workers when there aren't enough available.
+//
+// If it encounters problems creating new workers, the scheduler also
+// shuts down idle workers in case they are consuming quota.
+// Otherwise, workers are responsible for shutting themselves down
+// after the configured idle time threshold.
+type scheduler struct {
+	logger
+	containerQueue
+	workerPool
+
+	queue           []*container
+	queueMtx        sync.Mutex
+	instances       map[cloud.Instance]struct{}
+	enqueueOrUpdate chan container
+	runOnce         sync.Once
+}
+
+func (sched *scheduler) setup() {
+	sched.enqueueOrUpdate = make(chan container, 1)
+	go sched.run()
+}
+
+func (sched *scheduler) run() {
+	wakeup := make(chan struct{}, 1)
+	workers := map[*worker]*container{}
+	ctrs := map[string]*container{} // key is container UUID
+	timer := time.NewTimer(time.Second)
+	queue := []*container{}
+	for {
+		select {
+		case ctr := <-sched.enqueueOrUpdate:
+			// Get a newly queued container, or update
+			// priority/state.
+			if ctrs[ctr.UUID] == nil {
+				ctrs[ctr.UUID] = &ctr
+			} else {
+				ctrs[ctr.UUID].container = ctr.container
+				continue
+			}
+		case <-timer.C:
+		case <-wakeup:
+		}
+
+		queue = queue[:0]
+		for _, ctr := range ctrs {
+			if ctr.State == arvados.ContainerStateLocked {
+				queue = append(queue, ctr)
+			}
+		}
+		sort.Slice(queue, func(i, j int) bool {
+			if d := queue[i].Priority - queue[j].Priority; d != 0 {
+				return d > 0
+			} else {
+				return queue[i].UUID > queue[j].UUID
+			}
+		})
+
+		// Dispatch highest priority container to the idle
+		// worker with the shortest idle time.
+		for len(queue) > 0 {
+			select {
+			case todo[queue[0].wantsType] <- queue[0]:
+				queue = queue[1:]
+				continue
+			default:
+			}
+			break
+		}
+
+		// Compare queue to booting.
+	}
+}
+
+// DispatchFunc returns a dispatch.DispatchFunc.
+func (sched *scheduler) DispatchFunc(cluster *arvados.Cluster) func(actr arvados.Container, update <-chan arvados.Container) {
+	go sched.runOnce.Do(sched.run)
+	return func(actr arvados.Container, update <-chan arvados.Container) {
+		it, err := ChooseInstanceType(cluster, &actr)
+		if err != nil {
+			return err
+		}
+		sched.enqueueOrUpdate <- container{
+			container: actr,
+			wantsType: it,
+		}
+		for actr := range update {
+			sched.enqueueOrUpdate <- container{
+				container: actr,
+				wantsType: it,
+			}
+		}
+	}
+}
diff --git a/lib/dispatchcloud/ssh.go b/lib/dispatchcloud/ssh.go
new file mode 100644
index 000000000..3b8a1dc80
--- /dev/null
+++ b/lib/dispatchcloud/ssh.go
@@ -0,0 +1,127 @@
+package dispatchcloud
+
+import (
+	"bytes"
+	"errors"
+	"io"
+	"sync/atomic"
+	"time"
+
+	"golang.org/x/crypto/ssh"
+)
+
+type addressable interface {
+	String() string  // string representation for logging
+	Address() string // tcp address
+}
+
+type sshCommander struct {
+	logger    logger
+	host      addressable
+	client    atomic.Value // last *ssh.Client created
+	ready     chan bool    // closed if/when client gets a *ssh.Client
+	reconnect chan bool
+	closed    chan bool // closed when commander is Close()d
+}
+
+func newSSHCommander(logger logger, host addressable) *sshCommander {
+	sc := &sshCommander{
+		logger:    logger,
+		host:      host,
+		ready:     make(chan bool),
+		reconnect: make(chan bool, 1),
+		closed:    make(chan bool),
+	}
+	go sc.run()
+	return sc
+}
+
+func (sc *sshCommander) RunCommand(cmd string, stdin io.Reader) ([]byte, []byte, error) {
+	select {
+	case <-sc.ready:
+	case <-sc.closed:
+		return nil, nil, errors.New("commander is closed")
+	}
+	client := sc.client.Load().(*ssh.Client)
+	session, err := client.NewSession()
+	if err != nil {
+		return nil, nil, err
+	}
+	defer session.Close()
+	var stdout, stderr bytes.Buffer
+	session.Stdout = stdout
+	session.Stderr = stderr
+	if stdin != nil {
+		session.Stdin = stdin
+	}
+	sc.logger.Debugf("ssh: %s: running command: %s", sc.host, cmd)
+	err = session.Run(cmd)
+	return stdout.Bytes(), stderr.Bytes(), err
+}
+
+func (sc *sshCommander) Reconnect() {
+	select {
+	case sc.reconnect <- true:
+	default:
+	}
+}
+
+func (sc *sshCommander) Close() {
+	// Ensure a false value gets into the reconnect channel, even
+	// if it's full/filling with true values from other
+	// goroutines.
+	for {
+		select {
+		case sc.reconnect <- false:
+			return
+		case <-sc.reconnect:
+		}
+	}
+}
+
+func (sc *sshCommander) run() {
+	defer close(sc.closed)
+	var oldclient *ssh.Client
+	for {
+		select {
+		case alive := <-sc.reconnect:
+			if !alive {
+				return
+			}
+		default:
+		}
+		addr := sc.host.Address()
+		if addr == "" {
+			time.Sleep(time.Second)
+			continue
+		}
+		sc.logger.Debugf("ssh: %s: connecting to %s", sc.host, addr)
+		client, err := ssh.Dial("tcp", addr, &ssh.ClientConfig{
+			User: "root",
+			Auth: []ssh.AuthMethod{
+				ssh.Password("1234"),
+			},
+			// TODO: validate the host key
+			HostKeyCallback: ssh.InsecureIgnoreHostKey(),
+			Timeout:         time.Minute,
+		})
+		if err != nil {
+			sc.logger.Warnf("ssh: %s: connect failed: %s", sc.host, err)
+			time.Sleep(10 * time.Second)
+			continue
+		}
+
+		sc.client.Store(client)
+		if oldclient == nil {
+			close(sc.ready)
+		} else {
+			oldclient.Close()
+		}
+		oldclient = client
+		sc.logger.Debugf("ssh: %s: connected to %s", sc.host, addr)
+
+		if !<-sc.reconnect {
+			return
+		}
+	}
+}
diff --git a/lib/dispatchcloud/worker.go b/lib/dispatchcloud/worker.go
new file mode 100644
index 000000000..6b202a00d
--- /dev/null
+++ b/lib/dispatchcloud/worker.go
@@ -0,0 +1,170 @@
+package dispatchcloud
+
+import (
+	"errors"
+	"sync"
+	"sync/atomic"
+	"time"
+
+	"git.curoverse.com/arvados.git/lib/cloud"
+)
+
+// A worker wraps a cloud.Instance. It manages the instance's life
+// cycle (boot, monitor, shut down) and handles all communication
+// involved in starting and stopping containers.
+type worker interface {
+	Running() []string
+	Wait() bool
+	StartContainer(ctr *container) bool
+}
+
+// An sshWorker uses a cloud instance
+type sshWorker struct {
+	starts    uint64
+	instance  cloud.Instance
+	ready     bool
+	readyCond sync.Cond
+	running   []string
+	done      chan struct{} // closed if worker has terminated
+	err       error
+}
+
+// Wait returns true when the worker is ready to start a new
+// container, or false if the worker terminates before becoming ready.
+func (wkr *sshWorker) Wait() bool {
+	wkr.readyCond.L.Lock()
+	defer wkr.readyCond.L.Unlock()
+	for {
+		wkr.readyCond.Wait()
+		if wkr.ready {
+			return true
+		}
+		select {
+		case <-wkr.done:
+			return false
+		default:
+			continue
+		}
+	}
+}
+
+// Running returns the UUIDs of the containers currently running on
+// the instance.
+//
+// Running blocks until the instance asserts the VM is new (has never
+// started a container), the worker's first probe succeeds, or the
+// worker shuts down. After that, it returns instantaneously.
+//
+// A workerPool should use this to discover containers that are still
+// running after being abandoned by a previous server instantiation.
+func (wkr *sshWorker) Running() []string {
+	wkr.readyCond.L.Lock()
+	defer wkr.readyCond.L.Unlock()
+	for !wkr.ready {
+		wkr.readyCond.Wait()
+	}
+	return wkr.running
+}
+
+// StartContainer starts the given container on the worker and returns
+// true.
+//
+// If the container cannot be started now (typically because a failed
+// probe made it non-ready since the scheduler's last call to Wait()),
+// it returns false.
+func (wkr *sshWorker) StartContainer(ctr *container) bool {
+	wkr.readyCond.L.Lock()
+	defer wkr.readyCond.L.Unlock()
+	if !wkr.ready {
+		return false
+	}
+	wkr.ready = false
+	wkr.running = append(wkr.running, ctr.container.UUID)
+	atomic.AddUint64(&wkr.starts, 1)
+}
+
+// newSSHWorker creates a new sshWorker.
+//
+// In a new goroutine, the worker calls getInstance to get its backing
+// instance.
+//
+// Typically getInstance will either call (cloud.Provider)Create() or
+// return an existing instance that doesn't already belong to another
+// worker.
+func newSSHWorker(getInstance func() (cloud.Instance, error)) *sshWorker {
+	wkr := &sshWorker{
+		readyCond: sync.Cond{L: sync.Mutex{}},
+		done:      make(chan struct{}),
+	}
+	wkr.todo.Store((chan *container)(nil))
+	go func() {
+		inst, err := getInstance()
+		if err != nil {
+			wkr.err = err
+			close(wkr.done)
+			return
+		}
+		wkr.instance = inst
+		wkr.run()
+	}()
+	return wkr
+}
+
+// probeAndNotify calls probe() and takes any appropriate actions
+// based on the result.
+//
+// Possible actions include waking up goroutines in Wait(), printing
+// logs, and notifying the scheduler that a container is running.
+func (wkr *sshWorker) probeAndUpdate() {
+	starts := atomic.LoadUint64(&wkr.starts)
+	booted, uuids, err := wkr.probe(time.Minute / 2)
+	if err != nil {
+		wkr.readyCond.L.Lock()
+		wkr.ready = false
+		wkr.readyCond.L.Unlock()
+		wkr.logger.Infof("instance %s not responding for %s: %s", inst, time.Since(lastPing), err)
+		if time.Since(lastPing) > maxPingFailTime {
+			wkr.logger.Warnf("instance %s unresponsive since %s, shutting down", inst, lastPing)
+			err = inst.Destroy()
+			if err == nil {
+				cancel()
+			}
+		}
+		return
+	}
+	if !booted {
+		return
+	}
+	wkr.readyCond.L.Lock()
+	defer wkr.readyCond.L.Unlock()
+	if starts != wkr.starts {
+		// List of UUIDs is stale.
+		return
+	}
+	wkr.ready = true
+	wkr.running = uuids
+	wkr.readyCond.Broadcast()
+}
+
+var errWorkerTerminated = errors.New("worker terminated")
+
+func (wkr *sshWorker) run(inst cloud.Instance) {
+	defer func() { wkr.ready = true; wkr.readyCond.Broadcast() }()
+	defer close(wkr.done)
+
+	lastPing := time.Now()
+	maxPingFailTime := 10 * time.Minute
+	ticker := time.NewTicker(time.Minute)
+	defer ticker.Stop()
+
+	go wkr.probe(todo)
+	for {
+		var ok bool
+		select {
+		case <-ticker.C:
+			go wkr.probe(todo)
+		case ctr := <-todo:
+			// send "run container" command
+		}
+	}
+}
diff --git a/lib/dispatchcloud/worker_pool.go b/lib/dispatchcloud/worker_pool.go
new file mode 100644
index 000000000..23703491e
--- /dev/null
+++ b/lib/dispatchcloud/worker_pool.go
@@ -0,0 +1,138 @@
+package dispatchcloud
+
+import (
+	"io"
+	"sync"
+	"time"
+
+	"git.curoverse.com/arvados.git/lib/cloud"
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+	"github.com/Sirupsen/logrus"
+)
+
+type commander interface {
+	RunCommand(cmd string, stdin io.Reader) (stdout []byte, stderr []byte, err error)
+	Reconnect()
+	Close()
+}
+
+type workerPool struct {
+	logger   *logrus.FieldLogger
+	provider cloud.Provider
+
+	workers   map[cloud.Instance]worker
+	available map[arvados.InstanceType][]worker
+	plan      map[*worker]*container
+	booted    chan struct{}
+	mtx       sync.RWMutex
+}
+
+func (wp *workerPool) setup() {
+	wp.workers = map[cloud.Instance]worker{}
+	wp.available = map[arvados.InstanceType][]worker{}
+	wp.plan = map[*worker]*container{}
+	wp.booted = make(chan struct{})
+
+	go wp.loadInstances()
+}
+
+func (wp *workerPool) loadInstances() {
+	wp.logger.Infof("worker pool: bootstrapping")
+
+	var instances []cloud.Instance
+	var err error
+	for {
+		instances, err = wp.provider.Instances()
+		if err == nil {
+			break
+		}
+		wait := 15 * time.Second
+		wp.logger.Warnf("worker pool: error getting instance list (retry in %s): %s", wait, err)
+		time.Sleep(wait)
+	}
+
+	var wg sync.WaitGroup
+	for _, inst := range instances {
+		inst := inst
+		itype := taggedInstanceType(inst)
+		worker := newSSHWorker(func() (cloud.Instance, error) { return inst, nil })
+		if wp.workers[itype] == nil {
+			wp.workers[itype] = map[worker]bool{}
+		}
+		wp.workers[itype][worker] = false
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
+			worker.Running()
+			wp.mtx.Lock()
+			if wp.workers[itype] == nil {
+				wp.workers[itype] = map[worker]bool{}
+			}
+			wp.workers[itype][worker] = true
+			wp.mtx.Unlock()
+		}()
+	}
+	go func() {
+		wg.Wait()
+		wp.logger.Infof("worker pool: bootstrapping done")
+		close(wp.booted)
+	}()
+
+	wait := time.Minute
+	for {
+		wp.logger.Debugf("workerPool: wait %s", wait)
+		time.Sleep(wait)
+		wp.logger.Debugf("workerPool: getting instance list")
+		instances, err := wp.provider.Instances()
+		if err != nil {
+			wp.logger.Warnf("workerPool: error getting instance list: %s", err)
+			continue
+		}
+		for _, inst := range instances {
+			wp.mtx.Lock()
+			if wp.workers[itype] == nil {
+				wp.workers[itype] = map[worker]bool{}
+			}
+			wp.workers[itype][worker] = true
+			wp.mtx.Unlock()
+		}
+	}
+}
+
+func (wp *workerPool) Start() {
+	wp.setupOnce.Do(wp.setup)
+}
+
+// Available returns the number of non-allocated (i.e., booting or
+// idle) instances of the given type.
+func (wp *workerPool) Available(it arvados.InstanceType) int {
+	wp.Start()
+	wp.mtx.RLock()
+	defer wp.mtx.RUnlock()
+	return len(wp.workers[it])
+}
+
+// Create creates a new instance with the given type, and adds it to
+// the worker pool.
+func (wp *workerPool) Create(it arvados.InstanceType) error {
+	wp.Start()
+	wp.mtx.Lock()
+	defer wp.mtx.Unlock()
+	// ...
+}
+
+// Shutdown shuts down a worker with the given type, or returns false
+// if all workers with the given type are busy.
+func (wp *workerPool) Shutdown(it arvados.InstanceType) bool {
+	wp.Start()
+	wp.mtx.Lock()
+	defer wp.mtx.Unlock()
+	// ...
+}
+
+// Start starts a container on an idle node immediately if possible,
+// otherwise returns false.
+func (wp *workerPool) Start(ctr container) bool {
+	wp.Start()
+	// ...
+}
diff --git a/sdk/go/arvados/container.go b/sdk/go/arvados/container.go
index 210ed9981..ef3547048 100644
--- a/sdk/go/arvados/container.go
+++ b/sdk/go/arvados/container.go
@@ -18,7 +18,7 @@ type Container struct {
 	Mounts               map[string]Mount     `json:"mounts"`
 	Output               string               `json:"output"`
 	OutputPath           string               `json:"output_path"`
-	Priority             int                  `json:"priority"`
+	Priority             int64                `json:"priority"`
 	RuntimeConstraints   RuntimeConstraints   `json:"runtime_constraints"`
 	State                ContainerState       `json:"state"`
 	SchedulingParameters SchedulingParameters `json:"scheduling_parameters"`
-----------------------------------------------------------------------
hooks/post-receive
-- 
    
    
More information about the arvados-commits
mailing list