[ARVADOS] updated: 44e01cf266a3c062b2f0f5bb3426672024367d38

git at public.curoverse.com git at public.curoverse.com
Wed Oct 8 16:52:34 EDT 2014


Summary of changes:
 services/nodemanager/.gitignore                    |   4 +
 services/nodemanager/arvnodeman/__init__.py        |   9 +
 services/nodemanager/arvnodeman/clientactor.py     |  96 ++++++
 .../nodemanager/arvnodeman/computenode/__init__.py | 383 +++++++++++++++++++++
 .../nodemanager/arvnodeman/computenode/dummy.py    |  52 +++
 services/nodemanager/arvnodeman/computenode/ec2.py | 101 ++++++
 services/nodemanager/arvnodeman/config.py          | 108 ++++++
 services/nodemanager/arvnodeman/daemon.py          | 294 ++++++++++++++++
 services/nodemanager/arvnodeman/jobqueue.py        |  96 ++++++
 services/nodemanager/arvnodeman/launcher.py        | 130 +++++++
 services/nodemanager/arvnodeman/nodelist.py        |  39 +++
 services/nodemanager/arvnodeman/timedcallback.py   |  41 +++
 services/nodemanager/bin/arvados-node-manager      |   6 +
 services/nodemanager/doc/ec2.example.cfg           | 121 +++++++
 services/nodemanager/doc/local.example.cfg         |  41 +++
 services/{fuse => nodemanager}/setup.py            |  26 +-
 services/{fuse => nodemanager}/tests/__init__.py   |   0
 services/nodemanager/tests/test_clientactor.py     | 127 +++++++
 services/nodemanager/tests/test_computenode.py     | 272 +++++++++++++++
 services/nodemanager/tests/test_computenode_ec2.py |  89 +++++
 services/nodemanager/tests/test_config.py          |  65 ++++
 services/nodemanager/tests/test_daemon.py          | 158 +++++++++
 services/nodemanager/tests/test_jobqueue.py        |  74 ++++
 services/nodemanager/tests/test_nodelist.py        |  56 +++
 services/nodemanager/tests/test_timedcallback.py   |  55 +++
 services/nodemanager/tests/testutil.py             |  81 +++++
 26 files changed, 2508 insertions(+), 16 deletions(-)
 create mode 100644 services/nodemanager/.gitignore
 create mode 100644 services/nodemanager/arvnodeman/__init__.py
 create mode 100644 services/nodemanager/arvnodeman/clientactor.py
 create mode 100644 services/nodemanager/arvnodeman/computenode/__init__.py
 create mode 100644 services/nodemanager/arvnodeman/computenode/dummy.py
 create mode 100644 services/nodemanager/arvnodeman/computenode/ec2.py
 create mode 100644 services/nodemanager/arvnodeman/config.py
 create mode 100644 services/nodemanager/arvnodeman/daemon.py
 create mode 100644 services/nodemanager/arvnodeman/jobqueue.py
 create mode 100644 services/nodemanager/arvnodeman/launcher.py
 create mode 100644 services/nodemanager/arvnodeman/nodelist.py
 create mode 100644 services/nodemanager/arvnodeman/timedcallback.py
 create mode 100644 services/nodemanager/bin/arvados-node-manager
 create mode 100644 services/nodemanager/doc/ec2.example.cfg
 create mode 100644 services/nodemanager/doc/local.example.cfg
 copy services/{fuse => nodemanager}/setup.py (63%)
 copy services/{fuse => nodemanager}/tests/__init__.py (100%)
 create mode 100644 services/nodemanager/tests/test_clientactor.py
 create mode 100644 services/nodemanager/tests/test_computenode.py
 create mode 100644 services/nodemanager/tests/test_computenode_ec2.py
 create mode 100644 services/nodemanager/tests/test_config.py
 create mode 100644 services/nodemanager/tests/test_daemon.py
 create mode 100644 services/nodemanager/tests/test_jobqueue.py
 create mode 100644 services/nodemanager/tests/test_nodelist.py
 create mode 100644 services/nodemanager/tests/test_timedcallback.py
 create mode 100644 services/nodemanager/tests/testutil.py

       via  44e01cf266a3c062b2f0f5bb3426672024367d38 (commit)
       via  a5687a390262abebfc16cf21e62052ac0019512d (commit)
      from  4be23b41ec561b404dd833bdbea9d764f2b5d027 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 44e01cf266a3c062b2f0f5bb3426672024367d38
Merge: 4be23b4 a5687a3
Author: Brett Smith <brett at curoverse.com>
Date:   Wed Oct 8 16:52:44 2014 -0400

    Merge branch '2881-node-manager'
    
    Closes #2881, #4106.


commit a5687a390262abebfc16cf21e62052ac0019512d
Author: Brett Smith <brett at curoverse.com>
Date:   Fri Oct 3 17:53:57 2014 -0400

    2881: Add Node Manager service.

diff --git a/services/nodemanager/.gitignore b/services/nodemanager/.gitignore
new file mode 100644
index 0000000..488ddd5
--- /dev/null
+++ b/services/nodemanager/.gitignore
@@ -0,0 +1,4 @@
+*.pyc
+*.egg-info
+build/
+dist/
diff --git a/services/nodemanager/arvnodeman/__init__.py b/services/nodemanager/arvnodeman/__init__.py
new file mode 100644
index 0000000..a1ecac7
--- /dev/null
+++ b/services/nodemanager/arvnodeman/__init__.py
@@ -0,0 +1,9 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+import _strptime  # See <http://bugs.python.org/issue7980#msg221094>.
+import logging
+
+logger = logging.getLogger('arvnodeman')
+logger.addHandler(logging.NullHandler())
diff --git a/services/nodemanager/arvnodeman/clientactor.py b/services/nodemanager/arvnodeman/clientactor.py
new file mode 100644
index 0000000..77d85d6
--- /dev/null
+++ b/services/nodemanager/arvnodeman/clientactor.py
@@ -0,0 +1,96 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+import logging
+import time
+
+import pykka
+
+from .config import actor_class
+
+def _notify_subscribers(response, subscribers):
+    """Send the response to all the subscriber methods.
+
+    If any of the subscriber actors have stopped, remove them from the
+    subscriber set.
+    """
+    dead_subscribers = set()
+    for subscriber in subscribers:
+        try:
+            subscriber(response)
+        except pykka.ActorDeadError:
+            dead_subscribers.add(subscriber)
+    subscribers.difference_update(dead_subscribers)
+
+class RemotePollLoopActor(actor_class):
+    """Abstract actor class to regularly poll a remote service.
+
+    This actor sends regular requests to a remote service, and sends each
+    response to subscribers.  It takes care of error handling, and retrying
+    requests with exponential backoff.
+
+    To use this actor, define CLIENT_ERRORS and the _send_request method.
+    If you also define an _item_key method, this class will support
+    subscribing to a specific item by key in responses.
+    """
+    def __init__(self, client, timer_actor, poll_wait=60, max_poll_wait=180):
+        super(RemotePollLoopActor, self).__init__()
+        self._client = client
+        self._timer = timer_actor
+        self._logger = logging.getLogger(self.LOGGER_NAME)
+        self._later = self.actor_ref.proxy()
+        self.min_poll_wait = poll_wait
+        self.max_poll_wait = max_poll_wait
+        self.poll_wait = self.min_poll_wait
+        self.last_poll_time = None
+        self.all_subscribers = set()
+        self.key_subscribers = {}
+        if hasattr(self, '_item_key'):
+            self.subscribe_to = self._subscribe_to
+
+    def _start_polling(self):
+        if self.last_poll_time is None:
+            self.last_poll_time = time.time()
+            self._later.poll()
+
+    def subscribe(self, subscriber):
+        self.all_subscribers.add(subscriber)
+        self._logger.debug("%r subscribed to all events", subscriber)
+        self._start_polling()
+
+    # __init__ exposes this method to the proxy if the subclass defines
+    # _item_key.
+    def _subscribe_to(self, key, subscriber):
+        self.key_subscribers.setdefault(key, set()).add(subscriber)
+        self._logger.debug("%r subscribed to events for '%s'", subscriber, key)
+        self._start_polling()
+
+    def _send_request(self):
+        raise NotImplementedError("subclasses must implement request method")
+
+    def _got_response(self, response):
+        self.poll_wait = self.min_poll_wait
+        _notify_subscribers(response, self.all_subscribers)
+        if hasattr(self, '_item_key'):
+            items = {self._item_key(x): x for x in response}
+            for key, subscribers in self.key_subscribers.iteritems():
+                _notify_subscribers(items.get(key), subscribers)
+
+    def _got_error(self, error):
+        self.poll_wait = min(self.poll_wait * 2, self.max_poll_wait)
+        self._logger.warning("Client error: %s - waiting %s seconds",
+                             error, self.poll_wait)
+
+    def poll(self):
+        start_time = time.time()
+        try:
+            response = self._send_request()
+        except self.CLIENT_ERRORS as error:
+            self.last_poll_time = start_time
+            self._got_error(error)
+        else:
+            self.last_poll_time += self.poll_wait
+            self._got_response(response)
+        self._timer.schedule(self.last_poll_time + self.poll_wait,
+                             self._later.poll)
diff --git a/services/nodemanager/arvnodeman/computenode/__init__.py b/services/nodemanager/arvnodeman/computenode/__init__.py
new file mode 100644
index 0000000..ae25428
--- /dev/null
+++ b/services/nodemanager/arvnodeman/computenode/__init__.py
@@ -0,0 +1,383 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+import functools
+import itertools
+import logging
+import time
+
+import pykka
+
+from ..clientactor import _notify_subscribers
+from .. import config
+
+def arvados_node_fqdn(arvados_node, default_hostname='dynamic.compute'):
+    hostname = arvados_node.get('hostname') or default_hostname
+    return '{}.{}'.format(hostname, arvados_node['domain'])
+
+def arvados_node_mtime(node):
+    return time.mktime(time.strptime(node['modified_at'] + 'UTC',
+                                     '%Y-%m-%dT%H:%M:%SZ%Z')) - time.timezone
+
+def timestamp_fresh(timestamp, fresh_time):
+    return (time.time() - timestamp) < fresh_time
+
+def _retry(errors):
+    """Retry decorator for an actor method that makes remote requests.
+
+    Use this function to decorator an actor method, and pass in a tuple of
+    exceptions to catch.  This decorator will schedule retries of that method
+    with exponential backoff if the original method raises any of the given
+    errors.
+    """
+    def decorator(orig_func):
+        @functools.wraps(orig_func)
+        def wrapper(self, *args, **kwargs):
+            try:
+                orig_func(self, *args, **kwargs)
+            except errors as error:
+                self._logger.warning(
+                    "Client error: %s - waiting %s seconds",
+                    error, self.retry_wait)
+                self._timer.schedule(self.retry_wait,
+                                     getattr(self._later, orig_func.__name__),
+                                     *args, **kwargs)
+                self.retry_wait = min(self.retry_wait * 2,
+                                      self.max_retry_wait)
+            else:
+                self.retry_wait = self.min_retry_wait
+        return wrapper
+    return decorator
+
+class BaseComputeNodeDriver(object):
+    """Abstract base class for compute node drivers.
+
+    libcloud abstracts away many of the differences between cloud providers,
+    but managing compute nodes requires some cloud-specific features (e.g.,
+    on EC2 we use tags to identify compute nodes).  Compute node drivers
+    are responsible for translating the node manager's cloud requests to a
+    specific cloud's vocabulary.
+
+    Subclasses must implement arvados_create_kwargs (to update node
+    creation kwargs with information about the specific Arvados node
+    record), sync_node, and node_start_time.
+    """
+    def __init__(self, auth_kwargs, list_kwargs, create_kwargs, driver_class):
+        self.real = driver_class(**auth_kwargs)
+        self.list_kwargs = list_kwargs
+        self.create_kwargs = create_kwargs
+
+    def __getattr__(self, name):
+        # Proxy non-extension methods to the real driver.
+        if (not name.startswith('_') and not name.startswith('ex_')
+              and hasattr(self.real, name)):
+            return getattr(self.real, name)
+        else:
+            return super(BaseComputeNodeDriver, self).__getattr__(name)
+
+    def search_for(self, term, list_method, key=lambda item: item.id):
+        cache_key = (list_method, term)
+        if cache_key not in self.SEARCH_CACHE:
+            results = [item for item in getattr(self.real, list_method)()
+                       if key(item) == term]
+            count = len(results)
+            if count != 1:
+                raise ValueError("{} returned {} results for '{}'".format(
+                        list_method, count, term))
+            self.SEARCH_CACHE[cache_key] = results[0]
+        return self.SEARCH_CACHE[cache_key]
+
+    def list_nodes(self):
+        return self.real.list_nodes(**self.list_kwargs)
+
+    def arvados_create_kwargs(self, arvados_node):
+        raise NotImplementedError("BaseComputeNodeDriver.arvados_create_kwargs")
+
+    def create_node(self, size, arvados_node):
+        kwargs = self.create_kwargs.copy()
+        kwargs.update(self.arvados_create_kwargs(arvados_node))
+        kwargs['size'] = size
+        return self.real.create_node(**kwargs)
+
+    def sync_node(self, cloud_node, arvados_node):
+        # When a compute node first pings the API server, the API server
+        # will automatically assign some attributes on the corresponding
+        # node record, like hostname.  This method should propagate that
+        # information back to the cloud node appropriately.
+        raise NotImplementedError("BaseComputeNodeDriver.sync_node")
+
+    @classmethod
+    def node_start_time(cls, node):
+        raise NotImplementedError("BaseComputeNodeDriver.node_start_time")
+
+
+ComputeNodeDriverClass = BaseComputeNodeDriver
+
+class ComputeNodeSetupActor(config.actor_class):
+    """Actor to create and set up a cloud compute node.
+
+    This actor prepares an Arvados node record for a new compute node
+    (either creating one or cleaning one passed in), then boots the
+    actual compute node.  It notifies subscribers when the cloud node
+    is successfully created (the last step in the process for Node
+    Manager to handle).
+    """
+    def __init__(self, timer_actor, arvados_client, cloud_client,
+                 cloud_size, arvados_node=None,
+                 retry_wait=1, max_retry_wait=180):
+        super(ComputeNodeSetupActor, self).__init__()
+        self._timer = timer_actor
+        self._arvados = arvados_client
+        self._cloud = cloud_client
+        self._later = self.actor_ref.proxy()
+        self._logger = logging.getLogger('arvnodeman.nodeup')
+        self.cloud_size = cloud_size
+        self.subscribers = set()
+        self.min_retry_wait = retry_wait
+        self.max_retry_wait = max_retry_wait
+        self.retry_wait = retry_wait
+        self.arvados_node = None
+        self.cloud_node = None
+        if arvados_node is None:
+            self._later.create_arvados_node()
+        else:
+            self._later.prepare_arvados_node(arvados_node)
+
+    @_retry(config.ARVADOS_ERRORS)
+    def create_arvados_node(self):
+        self.arvados_node = self._arvados.nodes().create(body={}).execute()
+        self._later.create_cloud_node()
+
+    @_retry(config.ARVADOS_ERRORS)
+    def prepare_arvados_node(self, node):
+        self.arvados_node = self._arvados.nodes().update(
+            uuid=node['uuid'],
+            body={'hostname': None,
+                  'ip_address': None,
+                  'slot_number': None,
+                  'first_ping_at': None,
+                  'last_ping_at': None,
+                  'info': {'ec2_instance_id': None,
+                           'last_action': "Prepared by Node Manager"}}
+            ).execute()
+        self._later.create_cloud_node()
+
+    @_retry(config.CLOUD_ERRORS)
+    def create_cloud_node(self):
+        self._logger.info("Creating cloud node with size %s.",
+                          self.cloud_size.name)
+        self.cloud_node = self._cloud.create_node(self.cloud_size,
+                                                  self.arvados_node)
+        self._logger.info("Cloud node %s created.", self.cloud_node.id)
+        _notify_subscribers(self._later, self.subscribers)
+        self.subscribers = None
+
+    def stop_if_no_cloud_node(self):
+        if self.cloud_node is None:
+            self.stop()
+
+    def subscribe(self, subscriber):
+        if self.subscribers is None:
+            try:
+                subscriber(self._later)
+            except pykka.ActorDeadError:
+                pass
+        else:
+            self.subscribers.add(subscriber)
+
+
+class ComputeNodeShutdownActor(config.actor_class):
+    """Actor to shut down a compute node.
+
+    This actor simply destroys a cloud node, retrying as needed.
+    """
+    def __init__(self, timer_actor, cloud_client, cloud_node,
+                 retry_wait=1, max_retry_wait=180):
+        super(ComputeNodeShutdownActor, self).__init__()
+        self._timer = timer_actor
+        self._cloud = cloud_client
+        self._later = self.actor_ref.proxy()
+        self._logger = logging.getLogger('arvnodeman.nodedown')
+        self.cloud_node = cloud_node
+        self.min_retry_wait = retry_wait
+        self.max_retry_wait = max_retry_wait
+        self.retry_wait = retry_wait
+        self._later.shutdown_node()
+
+    @_retry(config.CLOUD_ERRORS)
+    def shutdown_node(self):
+        self._cloud.destroy_node(self.cloud_node)
+        self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
+
+
+class ComputeNodeUpdateActor(config.actor_class):
+    """Actor to dispatch one-off cloud management requests.
+
+    This actor receives requests for small cloud updates, and
+    dispatches them to a real driver.  ComputeNodeMonitorActors use
+    this to perform maintenance tasks on themselves.  Having a
+    dedicated actor for this gives us the opportunity to control the
+    flow of requests; e.g., by backing off when errors occur.
+
+    This actor is most like a "traditional" Pykka actor: there's no
+    subscribing, but instead methods return real driver results.  If
+    you're interested in those results, you should get them from the
+    Future that the proxy method returns.  Be prepared to handle exceptions
+    from the cloud driver when you do.
+    """
+    def __init__(self, cloud_factory, max_retry_wait=180):
+        super(ComputeNodeUpdateActor, self).__init__()
+        self._cloud = cloud_factory()
+        self.max_retry_wait = max_retry_wait
+        self.error_streak = 0
+        self.next_request_time = time.time()
+
+    def _throttle_errors(orig_func):
+        @functools.wraps(orig_func)
+        def wrapper(self, *args, **kwargs):
+            throttle_time = self.next_request_time - time.time()
+            if throttle_time > 0:
+                time.sleep(throttle_time)
+            self.next_request_time = time.time()
+            try:
+                result = orig_func(self, *args, **kwargs)
+            except config.CLOUD_ERRORS:
+                self.error_streak += 1
+                self.next_request_time += min(2 ** self.error_streak,
+                                              self.max_retry_wait)
+                raise
+            else:
+                self.error_streak = 0
+                return result
+        return wrapper
+
+    @_throttle_errors
+    def sync_node(self, cloud_node, arvados_node):
+        return self._cloud.sync_node(cloud_node, arvados_node)
+
+
+class ShutdownTimer(object):
+    """Keep track of a cloud node's shutdown windows.
+
+    Instantiate this class with a timestamp of when a cloud node started,
+    and a list of durations (in minutes) of when the node must not and may
+    be shut down, alternating.  The class will tell you when a shutdown
+    window is open, and when the next open window will start.
+    """
+    def __init__(self, start_time, shutdown_windows):
+        # The implementation is easiest if we have an even number of windows,
+        # because then windows always alternate between open and closed.
+        # Rig that up: calculate the first shutdown window based on what's
+        # passed in.  Then, if we were given an odd number of windows, merge
+        # that first window into the last one, since they both# represent
+        # closed state.
+        first_window = shutdown_windows[0]
+        shutdown_windows = list(shutdown_windows[1:])
+        self._next_opening = start_time + (60 * first_window)
+        if len(shutdown_windows) % 2:
+            shutdown_windows.append(first_window)
+        else:
+            shutdown_windows[-1] += first_window
+        self.shutdown_windows = itertools.cycle([60 * n
+                                                 for n in shutdown_windows])
+        self._open_start = self._next_opening
+        self._open_for = next(self.shutdown_windows)
+
+    def _advance_opening(self):
+        while self._next_opening < time.time():
+            self._open_start = self._next_opening
+            self._next_opening += self._open_for + next(self.shutdown_windows)
+            self._open_for = next(self.shutdown_windows)
+
+    def next_opening(self):
+        self._advance_opening()
+        return self._next_opening
+
+    def window_open(self):
+        self._advance_opening()
+        return 0 < (time.time() - self._open_start) < self._open_for
+
+
+class ComputeNodeMonitorActor(config.actor_class):
+    """Actor to manage a running compute node.
+
+    This actor gets updates about a compute node's cloud and Arvados records.
+    It uses this information to notify subscribers when the node is eligible
+    for shutdown.
+    """
+    def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
+                 timer_actor, update_actor, arvados_node=None,
+                 poll_stale_after=600, node_stale_after=3600):
+        super(ComputeNodeMonitorActor, self).__init__()
+        self._later = self.actor_ref.proxy()
+        self._logger = logging.getLogger('arvnodeman.computenode')
+        self._last_log = None
+        self._shutdowns = shutdown_timer
+        self._timer = timer_actor
+        self._update = update_actor
+        self.cloud_node = cloud_node
+        self.cloud_node_start_time = cloud_node_start_time
+        self.poll_stale_after = poll_stale_after
+        self.node_stale_after = node_stale_after
+        self.subscribers = set()
+        self.arvados_node = None
+        self._later.update_arvados_node(arvados_node)
+        self.last_shutdown_opening = None
+        self._later.consider_shutdown()
+
+    def subscribe(self, subscriber):
+        self.subscribers.add(subscriber)
+
+    def _debug(self, msg, *args):
+        if msg == self._last_log:
+            return
+        self._last_log = msg
+        self._logger.debug(msg, *args)
+
+    def _shutdown_eligible(self):
+        if self.arvados_node is None:
+            return timestamp_fresh(self.cloud_node_start_time,
+                                   self.node_stale_after)
+        else:
+            return (timestamp_fresh(arvados_node_mtime(self.arvados_node),
+                                    self.poll_stale_after) and
+                    (self.arvados_node['info'].get('slurm_state') == 'idle'))
+
+    def consider_shutdown(self):
+        next_opening = self._shutdowns.next_opening()
+        if self._shutdowns.window_open():
+            if self._shutdown_eligible():
+                self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
+                _notify_subscribers(self._later, self.subscribers)
+            else:
+                self._debug("Node %s shutdown window open but node busy.",
+                            self.cloud_node.id)
+        else:
+            self._debug("Node %s shutdown window closed.  Next at %s.",
+                        self.cloud_node.id, time.ctime(next_opening))
+        if self.last_shutdown_opening != next_opening:
+            self._timer.schedule(next_opening, self._later.consider_shutdown)
+            self.last_shutdown_opening = next_opening
+
+    def offer_arvados_pair(self, arvados_node):
+        if self.arvados_node is not None:
+            return None
+        elif arvados_node['ip_address'] in self.cloud_node.private_ips:
+            self._later.update_arvados_node(arvados_node)
+            return self.cloud_node.id
+        else:
+            return None
+
+    def update_cloud_node(self, cloud_node):
+        if cloud_node is not None:
+            self.cloud_node = cloud_node
+            self._later.consider_shutdown()
+
+    def update_arvados_node(self, arvados_node):
+        if arvados_node is not None:
+            self.arvados_node = arvados_node
+            new_hostname = arvados_node_fqdn(self.arvados_node)
+            if new_hostname != self.cloud_node.name:
+                self._update.sync_node(self.cloud_node, self.arvados_node)
+            self._later.consider_shutdown()
diff --git a/services/nodemanager/arvnodeman/computenode/dummy.py b/services/nodemanager/arvnodeman/computenode/dummy.py
new file mode 100644
index 0000000..6c39fea
--- /dev/null
+++ b/services/nodemanager/arvnodeman/computenode/dummy.py
@@ -0,0 +1,52 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+import time
+
+import libcloud.compute.providers as cloud_provider
+import libcloud.compute.types as cloud_types
+
+from . import BaseComputeNodeDriver, arvados_node_fqdn
+
+class ComputeNodeDriver(BaseComputeNodeDriver):
+    """Compute node driver wrapper for libcloud's dummy driver.
+
+    This class provides the glue necessary to run the node manager with a
+    dummy cloud.  It's useful for testing.
+    """
+    DEFAULT_DRIVER = cloud_provider.get_driver(cloud_types.Provider.DUMMY)
+    DEFAULT_REAL = DEFAULT_DRIVER('ComputeNodeDriver')
+    DUMMY_START_TIME = time.time()
+
+    def __init__(self, auth_kwargs, list_kwargs, create_kwargs,
+                 driver_class=DEFAULT_DRIVER):
+        super(ComputeNodeDriver, self).__init__(
+            auth_kwargs, list_kwargs, create_kwargs, driver_class)
+        if driver_class is self.DEFAULT_DRIVER:
+            self.real = self.DEFAULT_REAL
+
+    def _ensure_private_ip(self, node):
+        if not node.private_ips:
+            node.private_ips = ['10.10.0.{}'.format(node.id)]
+
+    def arvados_create_kwargs(self, arvados_node):
+        return {}
+
+    def list_nodes(self):
+        nodelist = super(ComputeNodeDriver, self).list_nodes()
+        for node in nodelist:
+            self._ensure_private_ip(node)
+        return nodelist
+
+    def create_node(self, size, arvados_node):
+        node = super(ComputeNodeDriver, self).create_node(size, arvados_node)
+        self._ensure_private_ip(node)
+        return node
+
+    def sync_node(self, cloud_node, arvados_node):
+        cloud_node.name = arvados_node_fqdn(arvados_node)
+
+    @classmethod
+    def node_start_time(cls, node):
+        return cls.DUMMY_START_TIME
diff --git a/services/nodemanager/arvnodeman/computenode/ec2.py b/services/nodemanager/arvnodeman/computenode/ec2.py
new file mode 100644
index 0000000..359bed4
--- /dev/null
+++ b/services/nodemanager/arvnodeman/computenode/ec2.py
@@ -0,0 +1,101 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+import time
+
+import libcloud.compute.base as cloud_base
+import libcloud.compute.providers as cloud_provider
+import libcloud.compute.types as cloud_types
+from libcloud.compute.drivers import ec2 as cloud_ec2
+
+from . import BaseComputeNodeDriver, arvados_node_fqdn
+
+### Monkeypatch libcloud to support AWS' new SecurityGroup API.
+# These classes can be removed when libcloud support specifying
+# security groups with the SecurityGroupId parameter.
+class ANMEC2Connection(cloud_ec2.EC2Connection):
+    def request(self, *args, **kwargs):
+        params = kwargs.get('params')
+        if (params is not None) and (params.get('Action') == 'RunInstances'):
+            for key in params.keys():
+                if key.startswith('SecurityGroup.'):
+                    new_key = key.replace('Group.', 'GroupId.', 1)
+                    params[new_key] = params.pop(key).id
+            kwargs['params'] = params
+        return super(ANMEC2Connection, self).request(*args, **kwargs)
+
+
+class ANMEC2NodeDriver(cloud_ec2.EC2NodeDriver):
+    connectionCls = ANMEC2Connection
+
+
+class ComputeNodeDriver(BaseComputeNodeDriver):
+    """Compute node driver wrapper for EC2.
+
+    This translates cloud driver requests to EC2's specific parameters.
+    """
+    DEFAULT_DRIVER = ANMEC2NodeDriver
+### End monkeypatch
+    SEARCH_CACHE = {}
+
+    def __init__(self, auth_kwargs, list_kwargs, create_kwargs,
+                 driver_class=DEFAULT_DRIVER):
+        # We need full lists of keys up front because these loops modify
+        # dictionaries in-place.
+        for key in list_kwargs.keys():
+            list_kwargs[key.replace('_', ':')] = list_kwargs.pop(key)
+        self.tags = {key[4:]: value
+                     for key, value in list_kwargs.iteritems()
+                     if key.startswith('tag:')}
+        super(ComputeNodeDriver, self).__init__(
+            auth_kwargs, {'ex_filters': list_kwargs}, create_kwargs,
+            driver_class)
+        for key in self.create_kwargs.keys():
+            init_method = getattr(self, '_init_' + key, None)
+            if init_method is not None:
+                new_pair = init_method(self.create_kwargs.pop(key))
+                if new_pair is not None:
+                    self.create_kwargs[new_pair[0]] = new_pair[1]
+
+    def _init_image_id(self, image_id):
+        return 'image', self.search_for(image_id, 'list_images')
+
+    def _init_ping_host(self, ping_host):
+        self.ping_host = ping_host
+
+    def _init_security_groups(self, group_names):
+        return 'ex_security_groups', [
+            self.search_for(gname.strip(), 'ex_get_security_groups')
+            for gname in group_names.split(',')]
+
+    def _init_subnet_id(self, subnet_id):
+        return 'ex_subnet', self.search_for(subnet_id, 'ex_list_subnets')
+
+    def _init_ssh_key(self, filename):
+        with open(filename) as ssh_file:
+            key = cloud_base.NodeAuthSSHKey(ssh_file.read())
+        return 'auth', key
+
+    def arvados_create_kwargs(self, arvados_node):
+        result = {'ex_metadata': self.tags.copy(),
+                  'name': arvados_node_fqdn(arvados_node)}
+        ping_secret = arvados_node['info'].get('ping_secret')
+        if ping_secret is not None:
+            ping_url = ('https://{}/arvados/v1/nodes/{}/ping?ping_secret={}'.
+                        format(self.ping_host, arvados_node['uuid'],
+                               ping_secret))
+            result['ex_userdata'] = ping_url
+        return result
+
+    def sync_node(self, cloud_node, arvados_node):
+        metadata = self.arvados_create_kwargs(arvados_node)
+        tags = metadata['ex_metadata']
+        tags['Name'] = metadata['name']
+        self.real.ex_create_tags(cloud_node, tags)
+
+    @classmethod
+    def node_start_time(cls, node):
+        time_str = node.extra['launch_time'].split('.', 2)[0] + 'UTC'
+        return time.mktime(time.strptime(
+                time_str,'%Y-%m-%dT%H:%M:%S%Z')) - time.timezone
diff --git a/services/nodemanager/arvnodeman/config.py b/services/nodemanager/arvnodeman/config.py
new file mode 100644
index 0000000..07504e2
--- /dev/null
+++ b/services/nodemanager/arvnodeman/config.py
@@ -0,0 +1,108 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+import ConfigParser
+import importlib
+import logging
+import ssl
+
+import apiclient.errors as apierror
+import arvados
+import httplib2
+import libcloud.common.types as cloud_types
+import pykka
+
+# IOError is the base class for socket.error and friends.
+# It seems like it hits the sweet spot for operations we want to retry:
+# it's low-level, but unlikely to catch code bugs.
+NETWORK_ERRORS = (IOError, ssl.SSLError)
+ARVADOS_ERRORS = NETWORK_ERRORS + (apierror.Error,)
+CLOUD_ERRORS = NETWORK_ERRORS + (cloud_types.LibcloudError,)
+
+actor_class = pykka.ThreadingActor
+
+class NodeManagerConfig(ConfigParser.SafeConfigParser):
+    """Node Manager Configuration class.
+
+    This a standard Python ConfigParser, with additional helper methods to
+    create objects instantiated with configuration information.
+    """
+
+    LOGGING_NONLEVELS = frozenset(['file'])
+
+    def __init__(self, *args, **kwargs):
+        # Can't use super() because SafeConfigParser is an old-style class.
+        ConfigParser.SafeConfigParser.__init__(self, *args, **kwargs)
+        for sec_name, settings in {
+            'Arvados': {'insecure': 'no',
+                        'timeout': '15'},
+            'Daemon': {'max_nodes': '1',
+                       'poll_time': '60',
+                       'max_poll_time': '300',
+                       'poll_stale_after': '600',
+                       'node_stale_after': str(60 * 60 * 2)},
+            'Logging': {'file': '/dev/stderr',
+                        'level': 'WARNING'},
+        }.iteritems():
+            if not self.has_section(sec_name):
+                self.add_section(sec_name)
+            for opt_name, value in settings.iteritems():
+                if not self.has_option(sec_name, opt_name):
+                    self.set(sec_name, opt_name, value)
+
+    def get_section(self, section, transformer=None):
+        result = self._dict()
+        for key, value in self.items(section):
+            if transformer is not None:
+                try:
+                    value = transformer(value)
+                except (TypeError, ValueError):
+                    pass
+            result[key] = value
+        return result
+
+    def log_levels(self):
+        return {key: getattr(logging, self.get('Logging', key).upper())
+                for key in self.options('Logging')
+                if key not in self.LOGGING_NONLEVELS}
+
+    def new_arvados_client(self):
+        if self.has_option('Daemon', 'certs_file'):
+            certs_file = self.get('Daemon', 'certs_file')
+        else:
+            certs_file = None
+        insecure = self.getboolean('Arvados', 'insecure')
+        http = httplib2.Http(timeout=self.getint('Arvados', 'timeout'),
+                             ca_certs=certs_file,
+                             disable_ssl_certificate_validation=insecure)
+        return arvados.api('v1',
+                           cache=False,  # Don't reuse an existing client.
+                           host=self.get('Arvados', 'host'),
+                           token=self.get('Arvados', 'token'),
+                           insecure=insecure,
+                           http=http)
+
+    def new_cloud_client(self):
+        module = importlib.import_module('arvnodeman.computenode.' +
+                                         self.get('Cloud', 'provider'))
+        auth_kwargs = self.get_section('Cloud Credentials')
+        if 'timeout' in auth_kwargs:
+            auth_kwargs['timeout'] = int(auth_kwargs['timeout'])
+        return module.ComputeNodeDriver(auth_kwargs,
+                                        self.get_section('Cloud List'),
+                                        self.get_section('Cloud Create'))
+
+    def node_sizes(self, all_sizes):
+        size_kwargs = {}
+        for sec_name in self.sections():
+            sec_words = sec_name.split(None, 2)
+            if sec_words[0] != 'Size':
+                continue
+            size_kwargs[sec_words[1]] = self.get_section(sec_name, int)
+        return [(size, size_kwargs[size.id]) for size in all_sizes
+                if size.id in size_kwargs]
+
+    def shutdown_windows(self):
+        return [int(n)
+                for n in self.get('Cloud', 'shutdown_windows').split(',')]
diff --git a/services/nodemanager/arvnodeman/daemon.py b/services/nodemanager/arvnodeman/daemon.py
new file mode 100644
index 0000000..5b7437f
--- /dev/null
+++ b/services/nodemanager/arvnodeman/daemon.py
@@ -0,0 +1,294 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+import functools
+import logging
+import time
+
+import pykka
+
+from . import computenode as cnode
+from .config import actor_class
+
+class _ComputeNodeRecord(object):
+    def __init__(self, actor=None, cloud_node=None, arvados_node=None,
+                 assignment_time=float('-inf')):
+        self.actor = actor
+        self.cloud_node = cloud_node
+        self.arvados_node = arvados_node
+        self.assignment_time = assignment_time
+
+
+class _BaseNodeTracker(object):
+    def __init__(self):
+        self.nodes = {}
+        self.orphans = {}
+
+    def __getitem__(self, key):
+        return self.nodes[key]
+
+    def __len__(self):
+        return len(self.nodes)
+
+    def get(self, key, default=None):
+        return self.nodes.get(key, default)
+
+    def record_key(self, record):
+        return self.item_key(getattr(record, self.RECORD_ATTR))
+
+    def add(self, record):
+        self.nodes[self.record_key(record)] = record
+
+    def update_record(self, key, item):
+        setattr(self.nodes[key], self.RECORD_ATTR, item)
+
+    def update_from(self, response):
+        unseen = set(self.nodes.iterkeys())
+        for item in response:
+            key = self.item_key(item)
+            if key in unseen:
+                unseen.remove(key)
+                self.update_record(key, item)
+            else:
+                yield key, item
+        self.orphans = {key: self.nodes.pop(key) for key in unseen}
+
+    def unpaired(self):
+        return (record for record in self.nodes.itervalues()
+                if getattr(record, self.PAIR_ATTR) is None)
+
+
+class _CloudNodeTracker(_BaseNodeTracker):
+    RECORD_ATTR = 'cloud_node'
+    PAIR_ATTR = 'arvados_node'
+    item_key = staticmethod(lambda cloud_node: cloud_node.id)
+
+
+class _ArvadosNodeTracker(_BaseNodeTracker):
+    RECORD_ATTR = 'arvados_node'
+    PAIR_ATTR = 'cloud_node'
+    item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
+
+    def find_stale_node(self, stale_time):
+        for record in self.nodes.itervalues():
+            node = record.arvados_node
+            if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
+                                          stale_time) and
+                  not cnode.timestamp_fresh(record.assignment_time,
+                                            stale_time)):
+                return node
+        return None
+
+
+class NodeManagerDaemonActor(actor_class):
+    """Node Manager daemon.
+
+    This actor subscribes to all information polls about cloud nodes,
+    Arvados nodes, and the job queue.  It creates a ComputeNodeMonitorActor
+    for every cloud node, subscribing them to poll updates
+    appropriately.  It creates and destroys cloud nodes based on job queue
+    demand, and stops the corresponding ComputeNode actors when their work
+    is done.
+    """
+    def __init__(self, server_wishlist_actor, arvados_nodes_actor,
+                 cloud_nodes_actor, cloud_update_actor, timer_actor,
+                 arvados_factory, cloud_factory,
+                 shutdown_windows, max_nodes,
+                 poll_stale_after=600, node_stale_after=7200,
+                 node_setup_class=cnode.ComputeNodeSetupActor,
+                 node_shutdown_class=cnode.ComputeNodeShutdownActor,
+                 node_actor_class=cnode.ComputeNodeMonitorActor):
+        super(NodeManagerDaemonActor, self).__init__()
+        self._node_setup = node_setup_class
+        self._node_shutdown = node_shutdown_class
+        self._node_actor = node_actor_class
+        self._cloud_updater = cloud_update_actor
+        self._timer = timer_actor
+        self._new_arvados = arvados_factory
+        self._new_cloud = cloud_factory
+        self._cloud_driver = self._new_cloud()
+        self._logger = logging.getLogger('arvnodeman.daemon')
+        self._later = self.actor_ref.proxy()
+        self.shutdown_windows = shutdown_windows
+        self.max_nodes = max_nodes
+        self.poll_stale_after = poll_stale_after
+        self.node_stale_after = node_stale_after
+        self.last_polls = {}
+        for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
+            poll_actor = locals()[poll_name + '_actor']
+            poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
+            setattr(self, '_{}_actor'.format(poll_name), poll_actor)
+            self.last_polls[poll_name] = -self.poll_stale_after
+        self.cloud_nodes = _CloudNodeTracker()
+        self.arvados_nodes = _ArvadosNodeTracker()
+        self.booting = {}       # Actor IDs to ComputeNodeSetupActors
+        self.shutdowns = {}     # Cloud node IDs to ComputeNodeShutdownActors
+        self._logger.debug("Daemon initialized")
+
+    def _update_poll_time(self, poll_key):
+        self.last_polls[poll_key] = time.time()
+
+    def _pair_nodes(self, node_record, arvados_node):
+        self._logger.info("Cloud node %s has associated with Arvados node %s",
+                          node_record.cloud_node.id, arvados_node['uuid'])
+        self._arvados_nodes_actor.subscribe_to(
+            arvados_node['uuid'], node_record.actor.update_arvados_node)
+        node_record.arvados_node = arvados_node
+        self.arvados_nodes.add(node_record)
+
+    def _new_node(self, cloud_node):
+        start_time = self._cloud_driver.node_start_time(cloud_node)
+        shutdown_timer = cnode.ShutdownTimer(start_time,
+                                             self.shutdown_windows)
+        actor = self._node_actor.start(
+            cloud_node=cloud_node,
+            cloud_node_start_time=start_time,
+            shutdown_timer=shutdown_timer,
+            update_actor=self._cloud_updater,
+            timer_actor=self._timer,
+            arvados_node=None,
+            poll_stale_after=self.poll_stale_after,
+            node_stale_after=self.node_stale_after).proxy()
+        actor.subscribe(self._later.node_can_shutdown)
+        self._cloud_nodes_actor.subscribe_to(cloud_node.id,
+                                             actor.update_cloud_node)
+        record = _ComputeNodeRecord(actor, cloud_node)
+        self.cloud_nodes.add(record)
+        return record
+
+    def update_cloud_nodes(self, nodelist):
+        self._update_poll_time('cloud_nodes')
+        for key, node in self.cloud_nodes.update_from(nodelist):
+            self._logger.info("Registering new cloud node %s", key)
+            record = self._new_node(node)
+            for arv_rec in self.arvados_nodes.unpaired():
+                if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
+                    self._pair_nodes(record, arv_rec.arvados_node)
+                    break
+        for key, record in self.cloud_nodes.orphans.iteritems():
+            record.actor.stop()
+            if key in self.shutdowns:
+                self.shutdowns.pop(key).stop()
+
+    def update_arvados_nodes(self, nodelist):
+        self._update_poll_time('arvados_nodes')
+        for key, node in self.arvados_nodes.update_from(nodelist):
+            self._logger.info("Registering new Arvados node %s", key)
+            record = _ComputeNodeRecord(arvados_node=node)
+            self.arvados_nodes.add(record)
+        for arv_rec in self.arvados_nodes.unpaired():
+            arv_node = arv_rec.arvados_node
+            for cloud_rec in self.cloud_nodes.unpaired():
+                if cloud_rec.actor.offer_arvados_pair(arv_node).get():
+                    self._pair_nodes(cloud_rec, arv_node)
+                    break
+
+    def _node_count(self):
+        up = sum(len(nodelist) for nodelist in [self.cloud_nodes, self.booting])
+        return up - len(self.shutdowns)
+
+    def _nodes_wanted(self):
+        return len(self.last_wishlist) - self._node_count()
+
+    def _nodes_excess(self):
+        return -self._nodes_wanted()
+
+    def update_server_wishlist(self, wishlist):
+        self._update_poll_time('server_wishlist')
+        self.last_wishlist = wishlist[:self.max_nodes]
+        nodes_wanted = self._nodes_wanted()
+        if nodes_wanted > 0:
+            self._later.start_node()
+        elif (nodes_wanted < 0) and self.booting:
+            self._later.stop_booting_node()
+
+    def _check_poll_freshness(orig_func):
+        """Decorator to inhibit a method when poll information is stale.
+
+        This decorator checks the timestamps of all the poll information the
+        daemon has received.  The decorated method is only called if none
+        of the timestamps are considered stale.
+        """
+        @functools.wraps(orig_func)
+        def wrapper(self, *args, **kwargs):
+            now = time.time()
+            if all(now - t < self.poll_stale_after
+                   for t in self.last_polls.itervalues()):
+                return orig_func(self, *args, **kwargs)
+            else:
+                return None
+        return wrapper
+
+    @_check_poll_freshness
+    def start_node(self):
+        nodes_wanted = self._nodes_wanted()
+        if nodes_wanted < 1:
+            return None
+        arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
+        cloud_size = self.last_wishlist[nodes_wanted - 1]
+        self._logger.info("Want %s more nodes.  Booting a %s node.",
+                          nodes_wanted, cloud_size.name)
+        new_setup = self._node_setup.start(
+            timer_actor=self._timer,
+            arvados_client=self._new_arvados(),
+            arvados_node=arvados_node,
+            cloud_client=self._new_cloud(),
+            cloud_size=cloud_size).proxy()
+        self.booting[new_setup.actor_ref.actor_urn] = new_setup
+        if arvados_node is not None:
+            self.arvados_nodes[arvados_node['uuid']].assignment_time = (
+                time.time())
+        new_setup.subscribe(self._later.node_up)
+        if nodes_wanted > 1:
+            self._later.start_node()
+
+    def _actor_nodes(self, node_actor):
+        return pykka.get_all([node_actor.cloud_node, node_actor.arvados_node])
+
+    def node_up(self, setup_proxy):
+        cloud_node, arvados_node = self._actor_nodes(setup_proxy)
+        del self.booting[setup_proxy.actor_ref.actor_urn]
+        setup_proxy.stop()
+        record = self.cloud_nodes.get(cloud_node.id)
+        if record is None:
+            record = self._new_node(cloud_node)
+        self._pair_nodes(record, arvados_node)
+
+    @_check_poll_freshness
+    def stop_booting_node(self):
+        nodes_excess = self._nodes_excess()
+        if (nodes_excess < 1) or not self.booting:
+            return None
+        for key, node in self.booting.iteritems():
+            node.stop_if_no_cloud_node().get()
+            if not node.actor_ref.is_alive():
+                del self.booting[key]
+                if nodes_excess > 1:
+                    self._later.stop_booting_node()
+                break
+
+    @_check_poll_freshness
+    def node_can_shutdown(self, node_actor):
+        if self._nodes_excess() < 1:
+            return None
+        cloud_node, arvados_node = self._actor_nodes(node_actor)
+        if cloud_node.id in self.shutdowns:
+            return None
+        shutdown = self._node_shutdown.start(timer_actor=self._timer,
+                                             cloud_client=self._new_cloud(),
+                                             cloud_node=cloud_node).proxy()
+        self.shutdowns[cloud_node.id] = shutdown
+
+    def shutdown(self):
+        self._logger.info("Shutting down after signal.")
+        self.poll_stale_after = -1  # Inhibit starting/stopping nodes
+        for bootnode in self.booting.itervalues():
+            bootnode.stop_if_no_cloud_node()
+        self._later.await_shutdown()
+
+    def await_shutdown(self):
+        if any(node.actor_ref.is_alive() for node in self.booting.itervalues()):
+            self._timer.schedule(time.time() + 1, self._later.await_shutdown)
+        else:
+            self.stop()
diff --git a/services/nodemanager/arvnodeman/jobqueue.py b/services/nodemanager/arvnodeman/jobqueue.py
new file mode 100644
index 0000000..08ee12e
--- /dev/null
+++ b/services/nodemanager/arvnodeman/jobqueue.py
@@ -0,0 +1,96 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+from . import clientactor
+from .config import ARVADOS_ERRORS
+
+class ServerCalculator(object):
+    """Generate cloud server wishlists from an Arvados job queue.
+
+    Instantiate this class with a list of cloud node sizes you're willing to
+    use, plus keyword overrides from the configuration.  Then you can pass
+    job queues to servers_for_queue.  It will return a list of node sizes
+    that would best satisfy the jobs, choosing the cheapest size that
+    satisfies each job, and ignoring jobs that can't be satisfied.
+    """
+
+    class CloudSizeWrapper(object):
+        def __init__(self, real_size, **kwargs):
+            self.real = real_size
+            for name in ['id', 'name', 'ram', 'disk', 'bandwidth', 'price',
+                         'extra']:
+                setattr(self, name, getattr(self.real, name))
+            self.cores = kwargs.pop('cores')
+            self.scratch = self.disk
+            for name, override in kwargs.iteritems():
+                if not hasattr(self, name):
+                    raise ValueError("unrecognized size field '%s'" % (name,))
+                setattr(self, name, override)
+
+        def meets_constraints(self, **kwargs):
+            for name, want_value in kwargs.iteritems():
+                have_value = getattr(self, name)
+                if (have_value != 0) and (have_value < want_value):
+                    return False
+            return True
+
+
+    def __init__(self, server_list, max_nodes=None):
+        self.cloud_sizes = [self.CloudSizeWrapper(s, **kws)
+                            for s, kws in server_list]
+        self.cloud_sizes.sort(key=lambda s: s.price)
+        self.max_nodes = max_nodes or float("inf")
+
+    @staticmethod
+    def coerce_int(x, fallback):
+        try:
+            return int(x)
+        except (TypeError, ValueError):
+            return fallback
+
+    def cloud_size_for_constraints(self, constraints):
+        want_value = lambda key: self.coerce_int(constraints.get(key), 0)
+        wants = {'cores': want_value('min_cores_per_node'),
+                 'ram': want_value('min_ram_mb_per_node'),
+                 'scratch': want_value('min_scratch_mb_per_node')}
+        for size in self.cloud_sizes:
+            if size.meets_constraints(**wants):
+                return size
+        return None
+
+    def servers_for_queue(self, queue):
+        servers = []
+        for job in queue:
+            constraints = job['runtime_constraints']
+            want_count = self.coerce_int(constraints.get('min_nodes'), 1)
+            cloud_size = self.cloud_size_for_constraints(constraints)
+            if (want_count < self.max_nodes) and (cloud_size is not None):
+                servers.extend([cloud_size.real] * max(1, want_count))
+        return servers
+
+
+class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
+    """Actor to generate server wishlists from the job queue.
+
+    This actor regularly polls Arvados' job queue, and uses the provided
+    ServerCalculator to turn that into a list of requested node sizes.  That
+    list is sent to subscribers on every poll.
+    """
+
+    CLIENT_ERRORS = ARVADOS_ERRORS
+    LOGGER_NAME = 'arvnodeman.jobqueue'
+
+    def __init__(self, client, timer_actor, server_calc, *args, **kwargs):
+        super(JobQueueMonitorActor, self).__init__(
+            client, timer_actor, *args, **kwargs)
+        self._calculator = server_calc
+
+    def _send_request(self):
+        return self._client.jobs().queue().execute()['items']
+
+    def _got_response(self, queue):
+        server_list = self._calculator.servers_for_queue(queue)
+        self._logger.debug("Sending server wishlist: %s",
+                           ', '.join(s.name for s in server_list))
+        return super(JobQueueMonitorActor, self)._got_response(server_list)
diff --git a/services/nodemanager/arvnodeman/launcher.py b/services/nodemanager/arvnodeman/launcher.py
new file mode 100644
index 0000000..87f2dda
--- /dev/null
+++ b/services/nodemanager/arvnodeman/launcher.py
@@ -0,0 +1,130 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+import argparse
+import logging
+import signal
+import sys
+import time
+
+import daemon
+import pykka
+
+from . import config as nmconfig
+from .computenode import \
+    ComputeNodeSetupActor, ComputeNodeShutdownActor, ComputeNodeUpdateActor, \
+    ShutdownTimer
+from .daemon import NodeManagerDaemonActor
+from .jobqueue import JobQueueMonitorActor, ServerCalculator
+from .nodelist import ArvadosNodeListMonitorActor, CloudNodeListMonitorActor
+from .timedcallback import TimedCallBackActor
+
+node_daemon = None
+
+def abort(msg, code=1):
+    print("arvados-node-manager: " + msg)
+    sys.exit(code)
+
+def parse_cli(args):
+    parser = argparse.ArgumentParser(
+        prog='arvados-node-manager',
+        description="Dynamically allocate Arvados cloud compute nodes")
+    parser.add_argument(
+        '--foreground', action='store_true', default=False,
+        help="Run in the foreground.  Don't daemonize.")
+    parser.add_argument(
+        '--config', help="Path to configuration file")
+    return parser.parse_args(args)
+
+def load_config(path):
+    if not path:
+        abort("No --config file specified", 2)
+    config = nmconfig.NodeManagerConfig()
+    try:
+        with open(path) as config_file:
+            config.readfp(config_file)
+    except (IOError, OSError) as error:
+        abort("Error reading configuration file {}: {}".format(path, error))
+    return config
+
+def setup_logging(path, level, **sublevels):
+    handler = logging.FileHandler(path)
+    handler.setFormatter(logging.Formatter(
+            '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
+            '%Y-%m-%d %H:%M:%S'))
+    root_logger = logging.getLogger()
+    root_logger.addHandler(handler)
+    root_logger.setLevel(level)
+    for logger_name, sublevel in sublevels.iteritems():
+        sublogger = logging.getLogger(logger_name)
+        sublogger.setLevel(sublevel)
+
+def launch_pollers(config):
+    cloud_client = config.new_cloud_client()
+    arvados_client = config.new_arvados_client()
+    cloud_size_list = config.node_sizes(cloud_client.list_sizes())
+    if not cloud_size_list:
+        abort("No valid node sizes configured")
+
+    server_calculator = ServerCalculator(
+        cloud_size_list, config.getint('Daemon', 'max_nodes'))
+    poll_time = config.getint('Daemon', 'poll_time')
+    max_poll_time = config.getint('Daemon', 'max_poll_time')
+
+    timer = TimedCallBackActor.start(poll_time / 10.0).proxy()
+    cloud_node_poller = CloudNodeListMonitorActor.start(
+        cloud_client, timer, poll_time, max_poll_time).proxy()
+    arvados_node_poller = ArvadosNodeListMonitorActor.start(
+        arvados_client, timer, poll_time, max_poll_time).proxy()
+    job_queue_poller = JobQueueMonitorActor.start(
+        config.new_arvados_client(), timer, server_calculator,
+        poll_time, max_poll_time).proxy()
+    return timer, cloud_node_poller, arvados_node_poller, job_queue_poller
+
+_caught_signals = {}
+def shutdown_signal(signal_code, frame):
+    current_count = _caught_signals.get(signal_code, 0)
+    _caught_signals[signal_code] = current_count + 1
+    if node_daemon is None:
+        pykka.ActorRegistry.stop_all()
+        sys.exit(-signal_code)
+    elif current_count == 0:
+        node_daemon.shutdown()
+    elif current_count == 1:
+        pykka.ActorRegistry.stop_all()
+    else:
+        sys.exit(-signal_code)
+
+def main(args=None):
+    global node_daemon
+    args = parse_cli(args)
+    config = load_config(args.config)
+
+    if not args.foreground:
+        daemon.DaemonContext().open()
+    for sigcode in [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]:
+        signal.signal(sigcode, shutdown_signal)
+
+    setup_logging(config.get('Logging', 'file'), **config.log_levels())
+    timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
+        launch_pollers(config)
+    cloud_node_updater = ComputeNodeUpdateActor.start(
+        config.new_cloud_client).proxy()
+    node_daemon = NodeManagerDaemonActor.start(
+        job_queue_poller, arvados_node_poller, cloud_node_poller,
+        cloud_node_updater, timer,
+        config.new_arvados_client, config.new_cloud_client,
+        config.shutdown_windows(), config.getint('Daemon', 'max_nodes'),
+        config.getint('Daemon', 'poll_stale_after'),
+        config.getint('Daemon', 'node_stale_after')).proxy()
+
+    signal.pause()
+    daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
+    while not daemon_stopped():
+        time.sleep(1)
+    pykka.ActorRegistry.stop_all()
+
+
+if __name__ == '__main__':
+    main()
diff --git a/services/nodemanager/arvnodeman/nodelist.py b/services/nodemanager/arvnodeman/nodelist.py
new file mode 100644
index 0000000..7ddfb7c
--- /dev/null
+++ b/services/nodemanager/arvnodeman/nodelist.py
@@ -0,0 +1,39 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+from . import clientactor
+from . import config
+
+class ArvadosNodeListMonitorActor(clientactor.RemotePollLoopActor):
+    """Actor to poll the Arvados node list.
+
+    This actor regularly polls the list of Arvados node records, and
+    sends it to subscribers.
+    """
+
+    CLIENT_ERRORS = config.ARVADOS_ERRORS
+    LOGGER_NAME = 'arvnodeman.arvados_nodes'
+
+    def _item_key(self, node):
+        return node['uuid']
+
+    def _send_request(self):
+        return self._client.nodes().list(limit=10000).execute()['items']
+
+
+class CloudNodeListMonitorActor(clientactor.RemotePollLoopActor):
+    """Actor to poll the cloud node list.
+
+    This actor regularly polls the cloud to get a list of running compute
+    nodes, and sends it to subscribers.
+    """
+
+    CLIENT_ERRORS = config.CLOUD_ERRORS
+    LOGGER_NAME = 'arvnodeman.cloud_nodes'
+
+    def _item_key(self, node):
+        return node.id
+
+    def _send_request(self):
+        return self._client.list_nodes()
diff --git a/services/nodemanager/arvnodeman/timedcallback.py b/services/nodemanager/arvnodeman/timedcallback.py
new file mode 100644
index 0000000..a1df8ec
--- /dev/null
+++ b/services/nodemanager/arvnodeman/timedcallback.py
@@ -0,0 +1,41 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+import heapq
+import time
+
+import pykka
+
+from .config import actor_class
+
+class TimedCallBackActor(actor_class):
+    """Send messages to other actors on a schedule.
+
+    Other actors can call the schedule() method to schedule delivery of a
+    message at a later time.  This actor runs the necessary event loop for
+    delivery.
+    """
+    def __init__(self, max_sleep=1):
+        super(TimedCallBackActor, self).__init__()
+        self._proxy = self.actor_ref.proxy()
+        self.messages = []
+        self.max_sleep = max_sleep
+
+    def schedule(self, delivery_time, receiver, *args, **kwargs):
+        heapq.heappush(self.messages, (delivery_time, receiver, args, kwargs))
+        self._proxy.deliver()
+
+    def deliver(self):
+        if not self.messages:
+            return None
+        til_next = self.messages[0][0] - time.time()
+        if til_next < 0:
+            t, receiver, args, kwargs = heapq.heappop(self.messages)
+            try:
+                receiver(*args, **kwargs)
+            except pykka.ActorDeadError:
+                pass
+        else:
+            time.sleep(min(til_next, self.max_sleep))
+        self._proxy.deliver()
diff --git a/services/nodemanager/bin/arvados-node-manager b/services/nodemanager/bin/arvados-node-manager
new file mode 100644
index 0000000..3a91288
--- /dev/null
+++ b/services/nodemanager/bin/arvados-node-manager
@@ -0,0 +1,6 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+from arvnodeman.launcher import main
+main()
diff --git a/services/nodemanager/doc/ec2.example.cfg b/services/nodemanager/doc/ec2.example.cfg
new file mode 100644
index 0000000..a56e69e
--- /dev/null
+++ b/services/nodemanager/doc/ec2.example.cfg
@@ -0,0 +1,121 @@
+# EC2 configuration for Arvados Node Manager.
+# All times are in seconds unless specified otherwise.
+
+[Daemon]
+# Node Manager will not start any compute nodes when at least this
+# many are running.
+max_nodes = 8
+
+# Poll EC2 nodes and Arvados for new information every N seconds.
+poll_time = 60
+
+# Polls have exponential backoff when services fail to respond.
+# This is the longest time to wait between polls.
+max_poll_time = 300
+
+# If Node Manager can't succesfully poll a service for this long,
+# it will never start or stop compute nodes, on the assumption that its
+# information is too outdated.
+poll_stale_after = 600
+
+# "Node stale time" affects two related behaviors.
+# 1. If a compute node has been running for at least this long, but it
+# isn't paired with an Arvados node, do not shut it down, but leave it alone.
+# This prevents the node manager from shutting down a node that might
+# actually be doing work, but is having temporary trouble contacting the
+# API server.
+# 2. When the Node Manager starts a new compute node, it will try to reuse
+# an Arvados node that hasn't been updated for this long.
+node_stale_after = 14400
+
+# File path for Certificate Authorities
+certs_file = /etc/ssl/certs/ca-certificates.crt
+
+[Logging]
+# Log file path
+file = /var/log/arvados/node-manager.log
+
+# Log level for most Node Manager messages.
+# Choose one of DEBUG, INFO, WARNING, ERROR, or CRITICAL.
+# WARNING lets you know when polling a service fails.
+# INFO additionally lets you know when a compute node is started or stopped.
+level = INFO
+
+# You can also set different log levels for specific libraries.
+# Pykka is the Node Manager's actor library.
+# Setting this to DEBUG will display tracebacks for uncaught
+# exceptions in the actors, but it's also very chatty.
+pykka = WARNING
+
+# Setting apiclient to INFO will log the URL of every Arvados API request.
+apiclient = WARNING
+
+[Arvados]
+host = zyxwv.arvadosapi.com
+token = ARVADOS_TOKEN
+timeout = 15
+
+# Accept an untrusted SSL certificate from the API server?
+insecure = no
+
+[Cloud]
+provider = ec2
+
+# It's usually most cost-effective to shut down compute nodes during narrow
+# windows of time.  For example, EC2 bills each node by the hour, so the best
+# time to shut down a node is right before a new hour of uptime starts.
+# Shutdown windows define these periods of time.  These are windows in
+# full minutes, separated by commas.  Counting from the time the node is
+# booted, the node WILL NOT shut down for N1 minutes; then it MAY shut down
+# for N2 minutes; then it WILL NOT shut down for N3 minutes; and so on.
+# For example, "54, 5, 1" means the node may shut down from the 54th to the
+# 59th minute of each hour of uptime.
+# Specify at least two windows.  You can add as many as you need beyond that.
+shutdown_windows = 54, 5, 1
+
+[Cloud Credentials]
+key = KEY
+secret = SECRET_KEY
+region = us-east-1
+timeout = 60
+
+[Cloud List]
+# This section defines filters that find compute nodes.
+# Tags that you specify here will automatically be added to nodes you create.
+# Replace colons in Amazon filters with underscores
+# (e.g., write "tag:mytag" as "tag_mytag").
+instance-state-name = running
+tag_arvados-class = dynamic-compute
+tag_cluster = zyxwv
+
+[Cloud Create]
+# New compute nodes will send pings to Arvados at this host.
+# You may specify a port, and use brackets to disambiguate IPv6 addresses.
+ping_host = hostname:port
+
+# Give the name of an SSH key on AWS...
+ex_keyname = string
+
+# ... or a file path for an SSH key that can log in to the compute node.
+# (One or the other, not both.)
+# ssh_key = path
+
+# The EC2 IDs of the image and subnet compute nodes should use.
+image_id = idstring
+subnet_id = idstring
+
+# Comma-separated EC2 IDs for the security group(s) assigned to each
+# compute node.
+security_groups = idstring1, idstring2
+
+[Size t2.medium]
+# You can define any number of Size sections to list EC2 sizes you're
+# willing to use.  The Node Manager should boot the cheapest size(s) that
+# can run jobs in the queue (N.B.: defining more than one size has not been
+# tested yet).
+# Each size section MUST define the number of cores it has.  You may also
+# want to define the number of mebibytes of scratch space for Crunch jobs.
+# You can also override Amazon's provided data fields by setting the same
+# names here.
+cores = 2
+scratch = 100
\ No newline at end of file
diff --git a/services/nodemanager/doc/local.example.cfg b/services/nodemanager/doc/local.example.cfg
new file mode 100644
index 0000000..8a6e626
--- /dev/null
+++ b/services/nodemanager/doc/local.example.cfg
@@ -0,0 +1,41 @@
+# You can use this configuration to run a development Node Manager for
+# testing.  It uses libcloud's dummy driver and your own development API server.
+# When new cloud nodes are created, you'll need to simulate the ping that
+# they send to the Arvados API server.  The easiest way I've found to do that
+# is through the API server Rails console: load the Node object, set its
+# IP address to 10.10.0.N (where N is the cloud node's ID), and save.
+
+[Daemon]
+max_nodes = 8
+poll_time = 15
+max_poll_time = 60
+poll_stale_after = 600
+node_stale_after = 300
+certs_file = /etc/ssl/certs/ca-certificates.crt
+
+[Logging]
+level = DEBUG
+pykka = DEBUG
+apiclient = WARNING
+
+[Arvados]
+host = localhost:3030
+# This is the token for the text fixture's admin user.
+token = 4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h
+insecure = yes
+timeout = 15
+
+[Cloud]
+provider = dummy
+shutdown_windows = 1, 1
+timeout = 15
+
+[Cloud Credentials]
+creds = dummycreds
+
+[Cloud List]
+[Cloud Create]
+
+[Size 2]
+cores = 4
+scratch = 1234
diff --git a/services/nodemanager/setup.py b/services/nodemanager/setup.py
new file mode 100644
index 0000000..fabb883
--- /dev/null
+++ b/services/nodemanager/setup.py
@@ -0,0 +1,41 @@
+#!/usr/bin/env python
+
+import os
+import subprocess
+import time
+
+from setuptools import setup, find_packages
+
+SETUP_DIR = os.path.dirname(__file__) or "."
+cmd_opts = {'egg_info': {}}
+try:
+    git_tags = subprocess.check_output(
+        ['git', 'log', '--first-parent', '--max-count=1',
+         '--format=format:%ct %h', SETUP_DIR]).split()
+    assert len(git_tags) == 2
+except (AssertionError, OSError, subprocess.CalledProcessError):
+    pass
+else:
+    git_tags[0] = time.strftime('%Y%m%d%H%M%S', time.gmtime(int(git_tags[0])))
+    cmd_opts['egg_info']['tag_build'] = '.{}.{}'.format(*git_tags)
+
+setup(name='arvados-node-manager',
+      version='0.1',
+      description='Arvados compute node manager',
+      author='Arvados',
+      author_email='info at arvados.org',
+      url="https://arvados.org",
+      license='GNU Affero General Public License, version 3.0',
+      packages=find_packages(),
+      install_requires=[
+        'apache-libcloud',
+        'arvados-python-client',
+        'pykka',
+        'python-daemon',
+        ],
+      scripts=['bin/arvados-node-manager'],
+      test_suite='tests',
+      tests_require=['mock>=1.0'],
+      zip_safe=False,
+      options=cmd_opts,
+      )
diff --git a/services/nodemanager/tests/__init__.py b/services/nodemanager/tests/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/services/nodemanager/tests/test_clientactor.py b/services/nodemanager/tests/test_clientactor.py
new file mode 100644
index 0000000..0db0a33
--- /dev/null
+++ b/services/nodemanager/tests/test_clientactor.py
@@ -0,0 +1,127 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+import unittest
+
+import mock
+import pykka
+
+import arvnodeman.clientactor as clientactor
+from . import testutil
+
+class RemotePollLoopActorTestCase(testutil.RemotePollLoopActorTestMixin,
+                                  unittest.TestCase):
+    class MockClientError(Exception):
+        pass
+
+    class TestActor(clientactor.RemotePollLoopActor):
+        LOGGER_NAME = 'arvnodeman.testpoll'
+
+        def _send_request(self):
+            return self._client()
+    TestActor.CLIENT_ERRORS = (MockClientError,)
+    TEST_CLASS = TestActor
+
+
+    def build_monitor(self, side_effect, *args, **kwargs):
+        super(RemotePollLoopActorTestCase, self).build_monitor(*args, **kwargs)
+        self.client.side_effect = side_effect
+
+    def test_poll_loop_starts_after_subscription(self):
+        self.build_monitor(['test1'])
+        self.monitor.subscribe(self.subscriber)
+        self.wait_for_call(self.subscriber)
+        self.subscriber.assert_called_with('test1')
+        self.wait_for_call(self.timer.schedule)
+
+    def test_poll_loop_continues_after_failure(self):
+        self.build_monitor(self.MockClientError)
+        self.monitor.subscribe(self.subscriber)
+        self.wait_for_call(self.timer.schedule)
+        self.assertTrue(self.monitor.actor_ref.is_alive(),
+                        "poll loop died after error")
+        self.assertFalse(self.subscriber.called,
+                         "poll loop notified subscribers after error")
+
+    def test_late_subscribers_get_responses(self):
+        self.build_monitor(['late_test'])
+        self.monitor.subscribe(lambda response: None)
+        self.monitor.subscribe(self.subscriber)
+        self.monitor.poll()
+        self.wait_for_call(self.subscriber)
+        self.subscriber.assert_called_with('late_test')
+
+    def test_survive_dead_subscriptions(self):
+        self.build_monitor(['survive1', 'survive2'])
+        dead_subscriber = mock.Mock(name='dead_subscriber')
+        dead_subscriber.side_effect = pykka.ActorDeadError
+        self.monitor.subscribe(dead_subscriber)
+        self.wait_for_call(dead_subscriber)
+        self.monitor.subscribe(self.subscriber)
+        self.monitor.poll()
+        self.wait_for_call(self.subscriber)
+        self.subscriber.assert_called_with('survive2')
+        self.assertTrue(self.monitor.actor_ref.is_alive(),
+                        "poll loop died from dead subscriber")
+
+    def test_no_subscriptions_by_key_without_support(self):
+        self.build_monitor([])
+        with self.assertRaises(AttributeError):
+            self.monitor.subscribe_to('key')
+
+
+class RemotePollLoopActorWithKeysTestCase(testutil.RemotePollLoopActorTestMixin,
+                                          unittest.TestCase):
+    class TestActor(RemotePollLoopActorTestCase.TestActor):
+        def _item_key(self, item):
+            return item['key']
+    TEST_CLASS = TestActor
+
+
+    def build_monitor(self, side_effect, *args, **kwargs):
+        super(RemotePollLoopActorWithKeysTestCase, self).build_monitor(
+            *args, **kwargs)
+        self.client.side_effect = side_effect
+
+    def test_key_subscription(self):
+        self.build_monitor([[{'key': 1}, {'key': 2}]])
+        self.monitor.subscribe_to(2, self.subscriber)
+        self.wait_for_call(self.subscriber)
+        self.subscriber.assert_called_with({'key': 2})
+
+    def test_survive_dead_key_subscriptions(self):
+        item = {'key': 3}
+        self.build_monitor([[item], [item]])
+        dead_subscriber = mock.Mock(name='dead_subscriber')
+        dead_subscriber.side_effect = pykka.ActorDeadError
+        self.monitor.subscribe_to(3, dead_subscriber)
+        self.wait_for_call(dead_subscriber)
+        self.monitor.subscribe_to(3, self.subscriber)
+        self.monitor.poll()
+        self.wait_for_call(self.subscriber)
+        self.subscriber.assert_called_with(item)
+        self.assertTrue(self.monitor.actor_ref.is_alive(),
+                        "poll loop died from dead key subscriber")
+
+    def test_mixed_subscriptions(self):
+        item = {'key': 4}
+        self.build_monitor([[item], [item]])
+        key_subscriber = mock.Mock(name='key_subscriber')
+        self.monitor.subscribe(self.subscriber)
+        self.monitor.subscribe_to(4, key_subscriber)
+        self.monitor.poll()
+        self.wait_for_call(self.subscriber)
+        self.subscriber.assert_called_with([item])
+        key_subscriber.assert_called_with(item)
+
+    def test_subscription_to_missing_key(self):
+        self.build_monitor([[]])
+        self.monitor.subscribe_to('nonesuch', self.subscriber)
+        self.wait_for_call(self.subscriber)
+        self.subscriber.assert_called_with(None)
+
+
+if __name__ == '__main__':
+    unittest.main()
+
diff --git a/services/nodemanager/tests/test_computenode.py b/services/nodemanager/tests/test_computenode.py
new file mode 100644
index 0000000..2fc7a50
--- /dev/null
+++ b/services/nodemanager/tests/test_computenode.py
@@ -0,0 +1,272 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+import time
+import unittest
+
+import arvados.errors as arverror
+import httplib2
+import mock
+import pykka
+
+import arvnodeman.computenode as cnode
+from . import testutil
+
+class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
+    def make_mocks(self, arvados_effect=None, cloud_effect=None):
+        if arvados_effect is None:
+            arvados_effect = testutil.arvados_node_mock()
+        self.timer = testutil.MockTimer()
+        self.api_client = mock.MagicMock(name='api_client')
+        self.api_client.nodes().create().execute.side_effect = arvados_effect
+        self.api_client.nodes().update().execute.side_effect = arvados_effect
+        self.cloud_client = mock.MagicMock(name='cloud_client')
+        self.cloud_client.create_node.return_value = testutil.cloud_node_mock(1)
+
+    def make_actor(self, arv_node=None):
+        if not hasattr(self, 'timer'):
+            self.make_mocks()
+        self.setup_actor = cnode.ComputeNodeSetupActor.start(
+            self.timer, self.api_client, self.cloud_client,
+            testutil.MockSize(1), arv_node).proxy()
+
+    def test_creation_without_arvados_node(self):
+        self.make_actor()
+        self.wait_for_call(self.api_client.nodes().create().execute)
+        self.wait_for_call(self.cloud_client.create_node)
+
+    def test_creation_with_arvados_node(self):
+        arv_node = testutil.arvados_node_mock()
+        self.make_mocks([arv_node])
+        self.make_actor(arv_node)
+        self.wait_for_call(self.api_client.nodes().update().execute)
+        self.wait_for_call(self.cloud_client.create_node)
+
+    def test_failed_calls_retried(self):
+        self.make_mocks([
+                arverror.ApiError(httplib2.Response({'status': '500'}), ""),
+                testutil.arvados_node_mock(),
+                ])
+        self.make_actor()
+        self.wait_for_call(self.cloud_client.create_node)
+
+    def test_stop_when_no_cloud_node(self):
+        self.make_mocks(
+            arverror.ApiError(httplib2.Response({'status': '500'}), ""))
+        self.make_actor()
+        self.wait_for_call(self.api_client.nodes().create().execute)
+        self.setup_actor.stop_if_no_cloud_node()
+        self.assertTrue(
+            self.setup_actor.actor_ref.actor_stopped.wait(self.TIMEOUT))
+
+    def test_no_stop_when_cloud_node(self):
+        self.make_actor()
+        self.wait_for_call(self.cloud_client.create_node)
+        self.setup_actor.stop_if_no_cloud_node().get(self.TIMEOUT)
+        self.assertFalse(self.setup_actor.actor_ref.actor_stopped.is_set())
+
+    def test_subscribe(self):
+        self.make_mocks(
+            arverror.ApiError(httplib2.Response({'status': '500'}), ""))
+        self.make_actor()
+        subscriber = mock.Mock(name='subscriber_mock')
+        self.setup_actor.subscribe(subscriber)
+        self.api_client.nodes().create().execute.side_effect = [
+            testutil.arvados_node_mock()]
+        self.wait_for_call(subscriber)
+        self.assertEqual(self.setup_actor.actor_ref.actor_urn,
+                         subscriber.call_args[0][0].actor_ref.actor_urn)
+
+    def test_late_subscribe(self):
+        self.make_actor()
+        subscriber = mock.Mock(name='subscriber_mock')
+        self.wait_for_call(self.cloud_client.create_node)
+        self.setup_actor.subscribe(subscriber)
+        self.wait_for_call(subscriber)
+        self.assertEqual(self.setup_actor.actor_ref.actor_urn,
+                         subscriber.call_args[0][0].actor_ref.actor_urn)
+
+
+class ComputeNodeShutdownActorTestCase(testutil.ActorTestMixin,
+                                       unittest.TestCase):
+    def make_mocks(self, cloud_node=None):
+        self.timer = testutil.MockTimer()
+        self.cloud_client = mock.MagicMock(name='cloud_client')
+        if cloud_node is None:
+            cloud_node = testutil.cloud_node_mock()
+        self.cloud_node = cloud_node
+
+    def make_actor(self, arv_node=None):
+        if not hasattr(self, 'timer'):
+            self.make_mocks()
+        self.shutdown_actor = cnode.ComputeNodeShutdownActor.start(
+            self.timer, self.cloud_client, self.cloud_node).proxy()
+
+    def test_easy_shutdown(self):
+        self.make_actor()
+        self.wait_for_call(self.cloud_client.destroy_node)
+
+
+class ComputeNodeUpdateActorTestCase(testutil.ActorTestMixin,
+                                     unittest.TestCase):
+    def make_actor(self):
+        self.driver = mock.MagicMock(name='driver_mock')
+        self.updater = cnode.ComputeNodeUpdateActor.start(self.driver).proxy()
+
+    def test_node_sync(self):
+        self.make_actor()
+        cloud_node = testutil.cloud_node_mock()
+        arv_node = testutil.arvados_node_mock()
+        self.updater.sync_node(cloud_node, arv_node).get(self.TIMEOUT)
+        self.driver().sync_node.assert_called_with(cloud_node, arv_node)
+
+
+ at mock.patch('time.time', return_value=1)
+class ShutdownTimerTestCase(unittest.TestCase):
+    def test_two_length_window(self, time_mock):
+        timer = cnode.ShutdownTimer(time_mock.return_value, [8, 2])
+        self.assertEqual(481, timer.next_opening())
+        self.assertFalse(timer.window_open())
+        time_mock.return_value += 500
+        self.assertEqual(1081, timer.next_opening())
+        self.assertTrue(timer.window_open())
+        time_mock.return_value += 200
+        self.assertEqual(1081, timer.next_opening())
+        self.assertFalse(timer.window_open())
+
+    def test_three_length_window(self, time_mock):
+        timer = cnode.ShutdownTimer(time_mock.return_value, [6, 3, 1])
+        self.assertEqual(361, timer.next_opening())
+        self.assertFalse(timer.window_open())
+        time_mock.return_value += 400
+        self.assertEqual(961, timer.next_opening())
+        self.assertTrue(timer.window_open())
+        time_mock.return_value += 200
+        self.assertEqual(961, timer.next_opening())
+        self.assertFalse(timer.window_open())
+
+
+class ComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
+                                      unittest.TestCase):
+    class MockShutdownTimer(object):
+        def _set_state(self, is_open, next_opening):
+            self.window_open = lambda: is_open
+            self.next_opening = lambda: next_opening
+
+
+    def make_mocks(self, node_num):
+        self.shutdowns = self.MockShutdownTimer()
+        self.shutdowns._set_state(False, 300)
+        self.timer = mock.MagicMock(name='timer_mock')
+        self.updates = mock.MagicMock(name='update_mock')
+        self.cloud_mock = testutil.cloud_node_mock(node_num)
+        self.subscriber = mock.Mock(name='subscriber_mock')
+
+    def make_actor(self, node_num=1, arv_node=None, start_time=None):
+        if not hasattr(self, 'cloud_mock'):
+            self.make_mocks(node_num)
+        if start_time is None:
+            start_time = time.time()
+        start_time = time.time()
+        self.node_actor = cnode.ComputeNodeMonitorActor.start(
+            self.cloud_mock, start_time, self.shutdowns, self.timer,
+            self.updates, arv_node).proxy()
+        self.node_actor.subscribe(self.subscriber)
+
+    def test_init_shutdown_scheduling(self):
+        self.make_actor()
+        self.wait_for_call(self.timer.schedule)
+        self.assertEqual(300, self.timer.schedule.call_args[0][0])
+
+    def test_shutdown_subscription(self):
+        self.make_actor()
+        self.shutdowns._set_state(True, 600)
+        self.node_actor.consider_shutdown()
+        self.wait_for_call(self.subscriber)
+        self.assertEqual(self.node_actor.actor_ref.actor_urn,
+                         self.subscriber.call_args[0][0].actor_ref.actor_urn)
+
+    def test_shutdown_without_arvados_node(self):
+        self.make_actor()
+        self.shutdowns._set_state(True, 600)
+        self.node_actor.consider_shutdown()
+        self.wait_for_call(self.subscriber)
+
+    def test_no_shutdown_without_arvados_node_and_old_cloud_node(self):
+        self.make_actor(start_time=0)
+        self.shutdowns._set_state(True, 600)
+        self.node_actor.consider_shutdown()
+        self.assertFalse(self.subscriber.called)
+
+    def check_shutdown_rescheduled(self, window_open, next_window,
+                                   schedule_time=None):
+        self.shutdowns._set_state(window_open, next_window)
+        self.timer.schedule.reset_mock()
+        self.node_actor.consider_shutdown()
+        self.wait_for_call(self.timer.schedule)
+        if schedule_time is not None:
+            self.assertEqual(schedule_time, self.timer.schedule.call_args[0][0])
+        self.assertFalse(self.subscriber.called)
+
+    def test_shutdown_window_close_scheduling(self):
+        self.make_actor()
+        self.check_shutdown_rescheduled(False, 600, 600)
+
+    def test_no_shutdown_when_node_running_job(self):
+        self.make_actor(4, testutil.arvados_node_mock(4, job_uuid=True))
+        self.check_shutdown_rescheduled(True, 600)
+
+    def test_no_shutdown_when_node_state_unknown(self):
+        self.make_actor(5, testutil.arvados_node_mock(5, info={}))
+        self.check_shutdown_rescheduled(True, 600)
+
+    def test_no_shutdown_when_node_state_stale(self):
+        self.make_actor(6, testutil.arvados_node_mock(6, age=900))
+        self.check_shutdown_rescheduled(True, 600)
+
+    def test_arvados_node_match(self):
+        self.make_actor(2)
+        arv_node = testutil.arvados_node_mock(
+            2, hostname='compute-two.zzzzz.arvadosapi.com')
+        pair_future = self.node_actor.offer_arvados_pair(arv_node)
+        self.assertEqual(self.cloud_mock.id, pair_future.get(self.TIMEOUT))
+        self.wait_for_call(self.updates.sync_node)
+        self.updates.sync_node.assert_called_with(self.cloud_mock, arv_node)
+
+    def test_arvados_node_mismatch(self):
+        self.make_actor(3)
+        arv_node = testutil.arvados_node_mock(1)
+        pair_future = self.node_actor.offer_arvados_pair(arv_node)
+        self.assertIsNone(pair_future.get(self.TIMEOUT))
+
+    def test_update_cloud_node(self):
+        self.make_actor(1)
+        self.make_mocks(2)
+        self.cloud_mock.id = '1'
+        self.node_actor.update_cloud_node(self.cloud_mock)
+        current_cloud = self.node_actor.cloud_node.get(self.TIMEOUT)
+        self.assertEqual([testutil.ip_address_mock(2)],
+                         current_cloud.private_ips)
+
+    def test_missing_cloud_node_update(self):
+        self.make_actor(1)
+        self.node_actor.update_cloud_node(None)
+        current_cloud = self.node_actor.cloud_node.get(self.TIMEOUT)
+        self.assertEqual([testutil.ip_address_mock(1)],
+                         current_cloud.private_ips)
+
+    def test_update_arvados_node(self):
+        self.make_actor(3)
+        job_uuid = 'zzzzz-jjjjj-updatejobnode00'
+        new_arvados = testutil.arvados_node_mock(3, job_uuid)
+        self.node_actor.update_arvados_node(new_arvados)
+        current_arvados = self.node_actor.arvados_node.get(self.TIMEOUT)
+        self.assertEqual(job_uuid, current_arvados['job_uuid'])
+
+    def test_missing_arvados_node_update(self):
+        self.make_actor(4, testutil.arvados_node_mock(4))
+        self.node_actor.update_arvados_node(None)
+        current_arvados = self.node_actor.arvados_node.get(self.TIMEOUT)
+        self.assertEqual(testutil.ip_address_mock(4),
+                         current_arvados['ip_address'])
diff --git a/services/nodemanager/tests/test_computenode_ec2.py b/services/nodemanager/tests/test_computenode_ec2.py
new file mode 100644
index 0000000..d1c9e43
--- /dev/null
+++ b/services/nodemanager/tests/test_computenode_ec2.py
@@ -0,0 +1,89 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+import time
+import unittest
+
+import mock
+
+import arvnodeman.computenode.ec2 as ec2
+from . import testutil
+
+class EC2ComputeNodeDriverTestCase(unittest.TestCase):
+    def setUp(self):
+        self.driver_mock = mock.MagicMock(name='driver_mock')
+
+    def new_driver(self, auth_kwargs={}, list_kwargs={}, create_kwargs={}):
+        create_kwargs.setdefault('ping_host', '100::')
+        return ec2.ComputeNodeDriver(
+            auth_kwargs, list_kwargs, create_kwargs,
+            driver_class=self.driver_mock)
+
+    def test_driver_instantiation(self):
+        kwargs = {'key': 'testkey'}
+        driver = self.new_driver(auth_kwargs=kwargs)
+        self.assertTrue(self.driver_mock.called)
+        self.assertEqual(kwargs, self.driver_mock.call_args[1])
+
+    def test_list_kwargs_become_filters(self):
+        # We're also testing tag name translation.
+        driver = self.new_driver(list_kwargs={'tag_test': 'true'})
+        driver.list_nodes()
+        list_method = self.driver_mock().list_nodes
+        self.assertTrue(list_method.called)
+        self.assertEqual({'tag:test': 'true'},
+                          list_method.call_args[1].get('ex_filters'))
+
+    def test_create_location_loaded_at_initialization(self):
+        kwargs = {'location': 'testregion'}
+        driver = self.new_driver(create_kwargs=kwargs)
+        self.assertTrue(self.driver_mock().list_locations)
+
+    def test_create_image_loaded_at_initialization(self):
+        kwargs = {'image': 'testimage'}
+        driver = self.new_driver(create_kwargs=kwargs)
+        self.assertTrue(self.driver_mock().list_images)
+
+    def test_create_includes_ping_secret(self):
+        arv_node = testutil.arvados_node_mock(info={'ping_secret': 'ssshh'})
+        driver = self.new_driver()
+        driver.create_node(testutil.MockSize(1), arv_node)
+        create_method = self.driver_mock().create_node
+        self.assertTrue(create_method.called)
+        self.assertIn('ping_secret=ssshh',
+                      create_method.call_args[1].get('ex_userdata',
+                                                     'arg missing'))
+
+    def test_tags_created_from_arvados_node(self):
+        arv_node = testutil.arvados_node_mock(8)
+        cloud_node = testutil.cloud_node_mock(8)
+        driver = self.new_driver(list_kwargs={'tag:list': 'test'})
+        self.assertEqual({'ex_metadata': {'list': 'test'},
+                          'name': 'compute8.zzzzz.arvadosapi.com'},
+                         driver.arvados_create_kwargs(arv_node))
+
+    def test_tags_set_default_hostname_from_new_arvados_node(self):
+        arv_node = testutil.arvados_node_mock(hostname=None)
+        driver = self.new_driver()
+        actual = driver.arvados_create_kwargs(arv_node)
+        self.assertEqual('dynamic.compute.zzzzz.arvadosapi.com',
+                         actual['name'])
+
+    def test_sync_node(self):
+        arv_node = testutil.arvados_node_mock(1)
+        cloud_node = testutil.cloud_node_mock(2)
+        driver = self.new_driver()
+        driver.sync_node(cloud_node, arv_node)
+        tag_mock = self.driver_mock().ex_create_tags
+        self.assertTrue(tag_mock.called)
+        self.assertEqual('compute1.zzzzz.arvadosapi.com',
+                         tag_mock.call_args[0][1].get('Name', 'no name'))
+
+    def test_node_create_time(self):
+        refsecs = int(time.time())
+        reftuple = time.gmtime(refsecs)
+        node = testutil.cloud_node_mock()
+        node.extra = {'launch_time': time.strftime('%Y-%m-%dT%H:%M:%S.000Z',
+                                                   reftuple)}
+        self.assertEqual(refsecs, ec2.ComputeNodeDriver.node_start_time(node))
diff --git a/services/nodemanager/tests/test_config.py b/services/nodemanager/tests/test_config.py
new file mode 100644
index 0000000..3aa9541
--- /dev/null
+++ b/services/nodemanager/tests/test_config.py
@@ -0,0 +1,65 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+import io
+import logging
+import unittest
+
+import arvnodeman.config as nmconfig
+
+class NodeManagerConfigTestCase(unittest.TestCase):
+    TEST_CONFIG = u"""
+[Cloud]
+provider = dummy
+shutdown_windows = 52, 6, 2
+
+[Cloud Credentials]
+creds = dummy_creds
+
+[Cloud List]
+[Cloud Create]
+
+[Size 1]
+cores = 1
+
+[Logging]
+file = /dev/null
+level = DEBUG
+testlogger = INFO
+"""
+
+    def load_config(self, config=None, config_str=None):
+        if config is None:
+            config = nmconfig.NodeManagerConfig()
+        if config_str is None:
+            config_str = self.TEST_CONFIG
+        with io.StringIO(config_str) as config_fp:
+            config.readfp(config_fp)
+        return config
+
+    def test_seeded_defaults(self):
+        config = nmconfig.NodeManagerConfig()
+        sec_names = set(config.sections())
+        self.assertIn('Arvados', sec_names)
+        self.assertIn('Daemon', sec_names)
+        self.assertFalse(any(name.startswith('Size ') for name in sec_names))
+
+    def test_list_sizes(self):
+        config = self.load_config()
+        client = config.new_cloud_client()
+        sizes = config.node_sizes(client.list_sizes())
+        self.assertEqual(1, len(sizes))
+        size, kwargs = sizes[0]
+        self.assertEqual('Small', size.name)
+        self.assertEqual(1, kwargs['cores'])
+
+    def test_shutdown_windows(self):
+        config = self.load_config()
+        self.assertEqual([52, 6, 2], config.shutdown_windows())
+
+    def test_log_levels(self):
+        config = self.load_config()
+        self.assertEqual({'level': logging.DEBUG,
+                          'testlogger': logging.INFO},
+                         config.log_levels())
diff --git a/services/nodemanager/tests/test_daemon.py b/services/nodemanager/tests/test_daemon.py
new file mode 100644
index 0000000..176b096
--- /dev/null
+++ b/services/nodemanager/tests/test_daemon.py
@@ -0,0 +1,158 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+import time
+import unittest
+
+import mock
+
+import arvnodeman.daemon as nmdaemon
+from . import testutil
+
+class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
+                                     unittest.TestCase):
+    def make_daemon(self, cloud_nodes=[], arvados_nodes=[], want_sizes=[]):
+        for name in ['cloud_nodes', 'arvados_nodes', 'server_wishlist']:
+            setattr(self, name + '_poller', mock.MagicMock(name=name + '_mock'))
+        self.arv_factory = mock.MagicMock(name='arvados_mock')
+        self.cloud_factory = mock.MagicMock(name='cloud_mock')
+        self.cloud_factory().node_start_time.return_value = time.time()
+        self.cloud_updates = mock.MagicMock(name='updates_mock')
+        self.timer = testutil.MockTimer()
+        self.node_factory = mock.MagicMock(name='factory_mock')
+        self.node_setup = mock.MagicMock(name='setup_mock')
+        self.node_shutdown = mock.MagicMock(name='shutdown_mock')
+        self.daemon = nmdaemon.NodeManagerDaemonActor.start(
+            self.server_wishlist_poller, self.arvados_nodes_poller,
+            self.cloud_nodes_poller, self.cloud_updates, self.timer,
+            self.arv_factory, self.cloud_factory,
+            [54, 5, 1], 8, 600, 3600,
+            self.node_setup, self.node_shutdown, self.node_factory).proxy()
+        if cloud_nodes is not None:
+            self.daemon.update_cloud_nodes(cloud_nodes)
+        if arvados_nodes is not None:
+            self.daemon.update_arvados_nodes(arvados_nodes)
+        if want_sizes is not None:
+            self.daemon.update_server_wishlist(want_sizes)
+
+    def test_easy_node_creation(self):
+        size = testutil.MockSize(1)
+        self.make_daemon(want_sizes=[size])
+        self.wait_for_call(self.node_setup.start)
+
+    def test_node_pairing(self):
+        cloud_node = testutil.cloud_node_mock(1)
+        arv_node = testutil.arvados_node_mock(1)
+        self.make_daemon([cloud_node], [arv_node])
+        self.wait_for_call(self.node_factory.start)
+        pair_func = self.node_factory.start().proxy().offer_arvados_pair
+        self.wait_for_call(pair_func)
+        pair_func.assert_called_with(arv_node)
+
+    def test_node_pairing_after_arvados_update(self):
+        cloud_node = testutil.cloud_node_mock(2)
+        arv_node = testutil.arvados_node_mock(2, ip_address=None)
+        self.make_daemon([cloud_node], None)
+        pair_func = self.node_factory.start().proxy().offer_arvados_pair
+        pair_func().get.return_value = None
+        self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
+        pair_func.assert_called_with(arv_node)
+
+        pair_func().get.return_value = cloud_node.id
+        pair_func.reset_mock()
+        arv_node = testutil.arvados_node_mock(2)
+        self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
+        pair_func.assert_called_with(arv_node)
+
+    def test_old_arvados_node_not_double_assigned(self):
+        arv_node = testutil.arvados_node_mock(3, age=9000)
+        size = testutil.MockSize(3)
+        self.make_daemon(arvados_nodes=[arv_node], want_sizes=[size, size])
+        node_starter = self.node_setup.start
+        deadline = time.time() + self.TIMEOUT
+        while (time.time() < deadline) and (node_starter.call_count < 2):
+            time.sleep(.1)
+        self.assertEqual(2, node_starter.call_count)
+        used_nodes = [call[1].get('arvados_node')
+                      for call in node_starter.call_args_list]
+        self.assertIn(arv_node, used_nodes)
+        self.assertIn(None, used_nodes)
+
+    def test_node_count_satisfied(self):
+        self.make_daemon([testutil.cloud_node_mock()])
+        self.daemon.update_server_wishlist(
+            [testutil.MockSize(1)]).get(self.TIMEOUT)
+        self.assertFalse(self.node_setup.called)
+
+    def test_booting_nodes_counted(self):
+        cloud_node = testutil.cloud_node_mock(1)
+        arv_node = testutil.arvados_node_mock(1)
+        server_wishlist = [testutil.MockSize(1)] * 2
+        self.make_daemon([cloud_node], [arv_node], server_wishlist)
+        self.wait_for_call(self.node_setup.start)
+        self.node_setup.reset_mock()
+        self.daemon.update_server_wishlist(server_wishlist).get(self.TIMEOUT)
+        self.assertFalse(self.node_setup.called)
+
+    def test_no_duplication_when_booting_node_listed_fast(self):
+        # Test that we don't start two ComputeNodeMonitorActors when
+        # we learn about a booting node through a listing before we
+        # get the "node up" message from CloudNodeSetupActor.
+        cloud_node = testutil.cloud_node_mock(1)
+        self.make_daemon(want_sizes=[testutil.MockSize(1)])
+        self.wait_for_call(self.node_setup.start)
+        setup = mock.MagicMock(name='setup_node_mock')
+        setup.actor_ref = self.node_setup.start().proxy().actor_ref
+        setup.cloud_node.get.return_value = cloud_node
+        setup.arvados_node.get.return_value = testutil.arvados_node_mock(1)
+        self.daemon.update_cloud_nodes([cloud_node])
+        self.wait_for_call(self.node_factory.start)
+        self.node_factory.reset_mock()
+        self.daemon.node_up(setup).get(self.TIMEOUT)
+        self.assertFalse(self.node_factory.start.called)
+
+    def test_booting_nodes_shut_down(self):
+        self.make_daemon(want_sizes=[testutil.MockSize(1)])
+        self.wait_for_call(self.node_setup.start)
+        self.daemon.update_server_wishlist([])
+        self.wait_for_call(
+            self.node_setup.start().proxy().stop_if_no_cloud_node)
+
+    def test_shutdown_declined_at_wishlist_capacity(self):
+        cloud_node = testutil.cloud_node_mock(1)
+        size = testutil.MockSize(1)
+        self.make_daemon(cloud_nodes=[cloud_node], want_sizes=[size])
+        node_actor = self.node_factory().proxy()
+        self.daemon.node_can_shutdown(node_actor).get(self.TIMEOUT)
+        self.assertFalse(node_actor.shutdown.called)
+
+    def test_shutdown_accepted_below_capacity(self):
+        self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
+        node_actor = self.node_factory().proxy()
+        self.daemon.node_can_shutdown(node_actor)
+        self.wait_for_call(self.node_shutdown.start)
+
+    def test_clean_shutdown_waits_for_node_setup_finish(self):
+        self.make_daemon(want_sizes=[testutil.MockSize(1)])
+        self.wait_for_call(self.node_setup.start)
+        new_node = self.node_setup.start().proxy()
+        self.daemon.shutdown()
+        self.wait_for_call(new_node.stop_if_no_cloud_node)
+        self.daemon.node_up(new_node)
+        self.wait_for_call(new_node.stop)
+        self.assertTrue(
+            self.daemon.actor_ref.actor_stopped.wait(self.TIMEOUT))
+
+    def test_wishlist_ignored_after_shutdown(self):
+        size = testutil.MockSize(2)
+        self.make_daemon(want_sizes=[size])
+        node_starter = self.node_setup.start
+        self.wait_for_call(node_starter)
+        node_starter.reset_mock()
+        self.daemon.shutdown()
+        self.daemon.update_server_wishlist([size] * 2).get(self.TIMEOUT)
+        # Send another message and wait for a response, to make sure all
+        # internal messages generated by the wishlist update are processed.
+        self.daemon.update_server_wishlist([size] * 2).get(self.TIMEOUT)
+        self.assertFalse(node_starter.called)
diff --git a/services/nodemanager/tests/test_jobqueue.py b/services/nodemanager/tests/test_jobqueue.py
new file mode 100644
index 0000000..3814ba4
--- /dev/null
+++ b/services/nodemanager/tests/test_jobqueue.py
@@ -0,0 +1,74 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+import unittest
+
+import arvnodeman.jobqueue as jobqueue
+from . import testutil
+
+class ServerCalculatorTestCase(unittest.TestCase):
+    def make_calculator(self, factors, **kwargs):
+        return jobqueue.ServerCalculator(
+            [(testutil.MockSize(n), {'cores': n}) for n in factors], **kwargs)
+
+    def calculate(self, servcalc, *constraints):
+        return servcalc.servers_for_queue(
+            [{'runtime_constraints': cdict} for cdict in constraints])
+
+    def test_empty_queue_needs_no_servers(self):
+        servcalc = self.make_calculator([1])
+        self.assertEqual([], servcalc.servers_for_queue([]))
+
+    def test_easy_server_count(self):
+        servcalc = self.make_calculator([1])
+        servlist = self.calculate(servcalc, {'min_nodes': 3})
+        self.assertEqual(3, len(servlist))
+
+    def test_implicit_server_count(self):
+        servcalc = self.make_calculator([1])
+        servlist = self.calculate(servcalc, {}, {'min_nodes': 3})
+        self.assertEqual(4, len(servlist))
+
+    def test_bad_min_nodes_override(self):
+        servcalc = self.make_calculator([1])
+        servlist = self.calculate(servcalc,
+                                  {'min_nodes': -2}, {'min_nodes': 'foo'})
+        self.assertEqual(2, len(servlist))
+
+    def test_ignore_unsatisfiable_jobs(self):
+        servcalc = self.make_calculator([1], max_nodes=9)
+        servlist = self.calculate(servcalc,
+                                  {'min_cores_per_node': 2},
+                                  {'min_ram_mb_per_node': 256},
+                                  {'min_nodes': 6},
+                                  {'min_nodes': 12},
+                                  {'min_scratch_mb_per_node': 200})
+        self.assertEqual(6, len(servlist))
+
+
+class JobQueueMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
+                                   unittest.TestCase):
+    TEST_CLASS = jobqueue.JobQueueMonitorActor
+
+    class MockCalculator(object):
+        @staticmethod
+        def servers_for_queue(queue):
+            return [testutil.MockSize(n) for n in queue]
+
+
+    def build_monitor(self, side_effect, *args, **kwargs):
+        super(JobQueueMonitorActorTestCase, self).build_monitor(*args, **kwargs)
+        self.client.jobs().queue().execute.side_effect = side_effect
+
+    def test_subscribers_get_server_lists(self):
+        self.build_monitor([{'items': [1, 2]}], self.MockCalculator())
+        self.monitor.subscribe(self.subscriber)
+        self.wait_for_call(self.subscriber)
+        self.subscriber.assert_called_with([testutil.MockSize(1),
+                                            testutil.MockSize(2)])
+
+
+if __name__ == '__main__':
+    unittest.main()
+
diff --git a/services/nodemanager/tests/test_nodelist.py b/services/nodemanager/tests/test_nodelist.py
new file mode 100644
index 0000000..d9f47e2
--- /dev/null
+++ b/services/nodemanager/tests/test_nodelist.py
@@ -0,0 +1,56 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+import unittest
+
+import arvnodeman.nodelist as nodelist
+from . import testutil
+
+class ArvadosNodeListMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
+                                          unittest.TestCase):
+    TEST_CLASS = nodelist.ArvadosNodeListMonitorActor
+
+    def build_monitor(self, side_effect, *args, **kwargs):
+        super(ArvadosNodeListMonitorActorTestCase, self).build_monitor(
+            *args, **kwargs)
+        self.client.nodes().list().execute.side_effect = side_effect
+
+    def test_uuid_is_subscription_key(self):
+        node = testutil.arvados_node_mock()
+        self.build_monitor([{'items': [node]}])
+        self.monitor.subscribe_to(node['uuid'], self.subscriber)
+        self.wait_for_call(self.subscriber)
+        self.subscriber.assert_called_with(node)
+
+
+class CloudNodeListMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
+                                        unittest.TestCase):
+    TEST_CLASS = nodelist.CloudNodeListMonitorActor
+
+    class MockNode(object):
+        def __init__(self, count):
+            self.id = str(count)
+            self.name = 'test{}.example.com'.format(count)
+            self.private_ips = ['10.0.0.{}'.format(count)]
+            self.public_ips = []
+            self.size = None
+            self.state = 0
+
+
+    def build_monitor(self, side_effect, *args, **kwargs):
+        super(CloudNodeListMonitorActorTestCase, self).build_monitor(
+            *args, **kwargs)
+        self.client.list_nodes.side_effect = side_effect
+
+    def test_id_is_subscription_key(self):
+        node = self.MockNode(1)
+        self.build_monitor([[node]])
+        self.monitor.subscribe_to('1', self.subscriber)
+        self.wait_for_call(self.subscriber)
+        self.subscriber.assert_called_with(node)
+
+
+if __name__ == '__main__':
+    unittest.main()
+
diff --git a/services/nodemanager/tests/test_timedcallback.py b/services/nodemanager/tests/test_timedcallback.py
new file mode 100644
index 0000000..60f7b81
--- /dev/null
+++ b/services/nodemanager/tests/test_timedcallback.py
@@ -0,0 +1,55 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+import time
+import unittest
+
+import mock
+import pykka
+
+import arvnodeman.timedcallback as timedcallback
+from . import testutil
+
+ at testutil.no_sleep
+class TimedCallBackActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
+    def test_immediate_turnaround(self):
+        future = self.FUTURE_CLASS()
+        deliverer = timedcallback.TimedCallBackActor.start().proxy()
+        deliverer.schedule(time.time() - 1, future.set, 'immediate')
+        self.assertEqual('immediate', future.get(self.TIMEOUT))
+
+    def test_delayed_turnaround(self):
+        future = self.FUTURE_CLASS()
+        with mock.patch('time.time', return_value=0) as mock_now:
+            deliverer = timedcallback.TimedCallBackActor.start().proxy()
+            deliverer.schedule(1, future.set, 'delayed')
+            self.assertRaises(pykka.Timeout, future.get, .5)
+            mock_now.return_value = 2
+            self.assertEqual('delayed', future.get(self.TIMEOUT))
+
+    def test_out_of_order_scheduling(self):
+        future1 = self.FUTURE_CLASS()
+        future2 = self.FUTURE_CLASS()
+        with mock.patch('time.time', return_value=1.5) as mock_now:
+            deliverer = timedcallback.TimedCallBackActor.start().proxy()
+            deliverer.schedule(2, future2.set, 'second')
+            deliverer.schedule(1, future1.set, 'first')
+            self.assertEqual('first', future1.get(self.TIMEOUT))
+            self.assertRaises(pykka.Timeout, future2.get, .1)
+            mock_now.return_value = 3
+            self.assertEqual('second', future2.get(self.TIMEOUT))
+
+    def test_dead_actors_ignored(self):
+        receiver = mock.Mock(name='dead_actor', spec=pykka.ActorRef)
+        receiver.tell.side_effect = pykka.ActorDeadError
+        deliverer = timedcallback.TimedCallBackActor.start().proxy()
+        deliverer.schedule(time.time() - 1, receiver.tell, 'error')
+        self.wait_for_call(receiver.tell)
+        receiver.tell.assert_called_with('error')
+        self.assertTrue(deliverer.actor_ref.is_alive(), "deliverer died")
+
+
+if __name__ == '__main__':
+    unittest.main()
+
diff --git a/services/nodemanager/tests/testutil.py b/services/nodemanager/tests/testutil.py
new file mode 100644
index 0000000..a33f76f
--- /dev/null
+++ b/services/nodemanager/tests/testutil.py
@@ -0,0 +1,81 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+import time
+
+import mock
+import pykka
+
+no_sleep = mock.patch('time.sleep', lambda n: None)
+
+def arvados_node_mock(node_num=99, job_uuid=None, age=0, **kwargs):
+    if job_uuid is True:
+        job_uuid = 'zzzzz-jjjjj-jobjobjobjobjob'
+    slurm_state = 'idle' if (job_uuid is None) else 'alloc'
+    node = {'uuid': 'zzzzz-yyyyy-12345abcde67890',
+            'created_at': '2014-01-01T01:02:03Z',
+            'modified_at': time.strftime('%Y-%m-%dT%H:%M:%SZ',
+                                         time.gmtime(time.time() - age)),
+            'hostname': 'compute{}'.format(node_num),
+            'domain': 'zzzzz.arvadosapi.com',
+            'ip_address': ip_address_mock(node_num),
+            'job_uuid': job_uuid,
+            'info': {'slurm_state': slurm_state}}
+    node.update(kwargs)
+    return node
+
+def cloud_node_mock(node_num=99):
+    node = mock.NonCallableMagicMock(
+        ['id', 'name', 'state', 'public_ips', 'private_ips', 'driver', 'size',
+         'image', 'extra'],
+        name='cloud_node')
+    node.id = str(node_num)
+    node.name = node.id
+    node.public_ips = []
+    node.private_ips = [ip_address_mock(node_num)]
+    return node
+
+def ip_address_mock(last_octet):
+    return '10.20.30.{}'.format(last_octet)
+
+class MockSize(object):
+    def __init__(self, factor):
+        self.id = 'z{}.test'.format(factor)
+        self.name = self.id
+        self.ram = 128 * factor
+        self.disk = 100 * factor
+        self.bandwidth = 16 * factor
+        self.price = float(factor)
+        self.extra = {}
+
+    def __eq__(self, other):
+        return self.id == other.id
+
+
+class MockTimer(object):
+    def schedule(self, want_time, callback, *args, **kwargs):
+        return callback(*args, **kwargs)
+
+
+class ActorTestMixin(object):
+    FUTURE_CLASS = pykka.ThreadingFuture
+    TIMEOUT = 5
+
+    def tearDown(self):
+        pykka.ActorRegistry.stop_all()
+
+    def wait_for_call(self, mock_func, timeout=TIMEOUT):
+        deadline = time.time() + timeout
+        while (not mock_func.called) and (time.time() < deadline):
+            time.sleep(.1)
+        self.assertTrue(mock_func.called, "{} not called".format(mock_func))
+
+
+class RemotePollLoopActorTestMixin(ActorTestMixin):
+    def build_monitor(self, *args, **kwargs):
+        self.timer = mock.MagicMock(name='timer_mock')
+        self.client = mock.MagicMock(name='client_mock')
+        self.subscriber = mock.Mock(name='subscriber_mock')
+        self.monitor = self.TEST_CLASS.start(
+            self.client, self.timer, *args, **kwargs).proxy()

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list