[ARVADOS] created: 05bb359e5cb312cc5d8ccae94762a4e20a041c5f

git at public.curoverse.com git at public.curoverse.com
Fri Oct 3 17:52:03 EDT 2014


        at  05bb359e5cb312cc5d8ccae94762a4e20a041c5f (commit)


commit 05bb359e5cb312cc5d8ccae94762a4e20a041c5f
Author: Brett Smith <brett at curoverse.com>
Date:   Fri Oct 3 17:53:57 2014 -0400

    2881: Add services/nodemanager.

diff --git a/services/nodemanager/.gitignore b/services/nodemanager/.gitignore
new file mode 100644
index 0000000..488ddd5
--- /dev/null
+++ b/services/nodemanager/.gitignore
@@ -0,0 +1,4 @@
+*.pyc
+*.egg-info
+build/
+dist/
diff --git a/services/nodemanager/arvnodeman/__init__.py b/services/nodemanager/arvnodeman/__init__.py
new file mode 100644
index 0000000..ef437c1
--- /dev/null
+++ b/services/nodemanager/arvnodeman/__init__.py
@@ -0,0 +1,19 @@
+#!/usr/bin/env python
+
+# First import and expose all the classes we want to export.
+from .computenode import \
+    ComputeNodeSetupActor, ComputeNodeShutdownActor, ComputeNodeActor, \
+    ShutdownTimer
+from .daemon import NodeManagerDaemonActor
+from .jobqueue import JobQueueMonitorActor, ServerCalculator
+from .nodelist import ArvadosNodeListMonitorActor, CloudNodeListMonitorActor
+from .timedcallback import TimedCallBackActor
+
+__all__ = [name for name in locals().keys() if name[0].isupper()]
+
+# We now return you to your regularly scheduled program.
+import _strptime  # See <http://bugs.python.org/issue7980#msg221094>.
+import logging
+
+logger = logging.getLogger('arvnodeman')
+logger.addHandler(logging.NullHandler())
diff --git a/services/nodemanager/arvnodeman/clientactor.py b/services/nodemanager/arvnodeman/clientactor.py
new file mode 100644
index 0000000..079cb03
--- /dev/null
+++ b/services/nodemanager/arvnodeman/clientactor.py
@@ -0,0 +1,77 @@
+#!/usr/bin/env python
+
+import logging
+import time
+
+import pykka
+
+from .config import actor_class
+
+def _notify_subscribers(response, subscribers):
+    dead_subscribers = set()
+    for subscriber in subscribers:
+        try:
+            subscriber(response)
+        except pykka.ActorDeadError:
+            dead_subscribers.add(subscriber)
+    subscribers.difference_update(dead_subscribers)
+
+class RemotePollLoopActor(actor_class):
+    def __init__(self, client, timer_actor, poll_wait=60, max_poll_wait=180):
+        super(RemotePollLoopActor, self).__init__()
+        self._client = client
+        self._timer = timer_actor
+        self._logger = logging.getLogger(self.LOGGER_NAME)
+        self._later = self.actor_ref.proxy()
+        self.min_poll_wait = poll_wait
+        self.max_poll_wait = max_poll_wait
+        self.poll_wait = self.min_poll_wait
+        self.last_poll_time = None
+        self.all_subscribers = set()
+        self.key_subscribers = {}
+        if hasattr(self, '_item_key'):
+            self.subscribe_to = self._subscribe_to
+
+    def _start_polling(self):
+        if self.last_poll_time is None:
+            self.last_poll_time = time.time()
+            self._later.poll()
+
+    def subscribe(self, subscriber):
+        self.all_subscribers.add(subscriber)
+        self._logger.debug("%r subscribed to all events", subscriber)
+        self._start_polling()
+
+    def _subscribe_to(self, key, subscriber):
+        self.key_subscribers.setdefault(key, set()).add(subscriber)
+        self._logger.debug("%r subscribed to events for '%s'", subscriber, key)
+        self._start_polling()
+
+    def _send_request(self):
+        raise NotImplementedError("subclasses must implement request method")
+
+    def _got_response(self, response):
+        self.poll_wait = self.min_poll_wait
+        _notify_subscribers(response, self.all_subscribers)
+        if hasattr(self, '_item_key'):
+            items = {self._item_key(x): x for x in response}
+            for key, subscribers in self.key_subscribers.iteritems():
+                _notify_subscribers(items.get(key), subscribers)
+
+    def _got_error(self, error):
+        self.poll_wait = min(self.poll_wait * 2, self.max_poll_wait)
+        self._logger.warning("Client error: %s - waiting %s seconds",
+                             error, self.poll_wait)
+
+    def poll(self):
+        start_time = time.time()
+        try:
+            response = self._send_request()
+        except self.CLIENT_ERRORS as error:
+            self.last_poll_time = start_time
+            self._got_error(error)
+        else:
+            self.last_poll_time += self.poll_wait
+            self._got_response(response)
+        self._timer.schedule(self.last_poll_time + self.poll_wait,
+                             self._later.poll)
diff --git a/services/nodemanager/arvnodeman/computenode/__init__.py b/services/nodemanager/arvnodeman/computenode/__init__.py
new file mode 100644
index 0000000..e31e7c1
--- /dev/null
+++ b/services/nodemanager/arvnodeman/computenode/__init__.py
@@ -0,0 +1,279 @@
+#!/usr/bin/env python
+
+import functools
+import itertools
+import logging
+import time
+
+import pykka
+
+from ..clientactor import _notify_subscribers
+from .. import config
+
+def arvados_node_mtime(node):
+    return time.mktime(time.strptime(node['modified_at'] + 'UTC',
+                                     '%Y-%m-%dT%H:%M:%SZ%Z')) - time.timezone
+
+def timestamp_fresh(timestamp, fresh_time):
+    return (time.time() - timestamp) < fresh_time
+
+def _retry(errors):
+    def decorator(orig_func):
+        @functools.wraps(orig_func)
+        def wrapper(self, *args, **kwargs):
+            try:
+                orig_func(self, *args, **kwargs)
+            except errors as error:
+                self._logger.warning(
+                    "Client error: %s - waiting %s seconds",
+                    error, self.retry_wait)
+                self._timer.schedule(self.retry_wait,
+                                     getattr(self._later, orig_func.__name__),
+                                     *args, **kwargs)
+                self.retry_wait = min(self.retry_wait * 2,
+                                      self.max_retry_wait)
+            else:
+                self.retry_wait = self.min_retry_wait
+        return wrapper
+    return decorator
+
+class BaseComputeNodeDriver(object):
+    def __init__(self, auth_kwargs, list_kwargs, create_kwargs, driver_class):
+        self.real = driver_class(**auth_kwargs)
+        self.list_kwargs = list_kwargs
+        self.create_kwargs = create_kwargs
+
+    def __getattr__(self, name):
+        # Proxy non-extension methods to the real driver.
+        if (not name.startswith('_') and not name.startswith('ex_')
+              and hasattr(self.real, name)):
+            return getattr(self.real, name)
+        else:
+            return super(BaseComputeNodeDriver, self).__getattr__(name)
+
+    def search_for(self, term, list_method, key=lambda item: item.id):
+        cache_key = (list_method, term)
+        if cache_key not in self.SEARCH_CACHE:
+            results = [item for item in getattr(self.real, list_method)()
+                       if key(item) == term]
+            count = len(results)
+            if count != 1:
+                raise ValueError("{} returned {} results for '{}'".format(
+                        list_method, count, term))
+            self.SEARCH_CACHE[cache_key] = results[0]
+        return self.SEARCH_CACHE[cache_key]
+
+    def list_nodes(self):
+        return self.real.list_nodes(**self.list_kwargs)
+
+    def arvados_create_kwargs(self, arvados_node):
+        raise NotImplementedError("BaseComputeNodeDriver.arvados_create_kwargs")
+
+    def create_node(self, size, arvados_node):
+        kwargs = self.create_kwargs.copy()
+        kwargs.update(self.arvados_create_kwargs(arvados_node))
+        kwargs['size'] = size
+        return self.real.create_node(**kwargs)
+
+    def post_create_node(self, cloud_node, arvados_node):
+        pass
+
+    @classmethod
+    def node_start_time(cls, node):
+        raise NotImplementedError("BaseComputeNodeDriver.node_start_time")
+
+
+ComputeNodeDriverClass = BaseComputeNodeDriver
+
+class ComputeNodeSetupActor(config.actor_class):
+    def __init__(self, timer_actor, arvados_client, cloud_client,
+                 cloud_size, arvados_node=None,
+                 retry_wait=1, max_retry_wait=180):
+        super(ComputeNodeSetupActor, self).__init__()
+        self._timer = timer_actor
+        self._arvados = arvados_client
+        self._cloud = cloud_client
+        self._later = self.actor_ref.proxy()
+        self._logger = logging.getLogger('arvnodeman.nodeup')
+        self.cloud_size = cloud_size
+        self.subscribers = set()
+        self.min_retry_wait = retry_wait
+        self.max_retry_wait = max_retry_wait
+        self.retry_wait = retry_wait
+        self.arvados_node = None
+        self.cloud_node = None
+        if arvados_node is None:
+            self._later.create_arvados_node()
+        else:
+            self._later.prepare_arvados_node(arvados_node)
+
+    @_retry(config.ARVADOS_ERRORS)
+    def create_arvados_node(self):
+        self.arvados_node = self._arvados.nodes().create(body={}).execute()
+        self._later.create_cloud_node()
+
+    @_retry(config.ARVADOS_ERRORS)
+    def prepare_arvados_node(self, node):
+        self.arvados_node = self._arvados.nodes().update(
+            uuid=node['uuid'],
+            body={'hostname': None,
+                  'ip_address': None,
+                  'slot_number': None,
+                  'first_ping_at': None,
+                  'last_ping_at': None,
+                  'info': {'ec2_instance_id': None,
+                           'last_action': "Prepared by Node Manager"}}
+            ).execute()
+        self._later.create_cloud_node()
+
+    @_retry(config.CLOUD_ERRORS)
+    def create_cloud_node(self):
+        self._logger.info("Creating cloud node with size %s.",
+                          self.cloud_size.name)
+        self.cloud_node = self._cloud.create_node(self.cloud_size,
+                                                  self.arvados_node)
+        self._logger.info("Cloud node %s created.  Setting up.",
+                          self.cloud_node.id)
+        self._later.setup_cloud_node()
+
+    @_retry(config.CLOUD_ERRORS)
+    def setup_cloud_node(self):
+        self._cloud.post_create_node(self.cloud_node, self.arvados_node)
+        self._logger.info("Cloud node %s set up.", self.cloud_node.id)
+        _notify_subscribers(self._later, self.subscribers)
+        self.subscribers = None
+
+    def stop_if_no_cloud_node(self):
+        if self.cloud_node is None:
+            self.stop()
+
+    def subscribe(self, subscriber):
+        if self.subscribers is None:
+            try:
+                subscriber(self._later)
+            except pykka.ActorDeadError:
+                pass
+        else:
+            self.subscribers.add(subscriber)
+
+
+class ComputeNodeShutdownActor(config.actor_class):
+    def __init__(self, timer_actor, cloud_client, cloud_node,
+                 retry_wait=1, max_retry_wait=180):
+        super(ComputeNodeShutdownActor, self).__init__()
+        self._timer = timer_actor
+        self._cloud = cloud_client
+        self._later = self.actor_ref.proxy()
+        self._logger = logging.getLogger('arvnodeman.nodedown')
+        self.cloud_node = cloud_node
+        self.min_retry_wait = retry_wait
+        self.max_retry_wait = max_retry_wait
+        self.retry_wait = retry_wait
+        self._later.shutdown_node()
+
+    @_retry(config.CLOUD_ERRORS)
+    def shutdown_node(self):
+        self._cloud.destroy_node(self.cloud_node)
+        self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
+
+
+class ShutdownTimer(object):
+    def __init__(self, start_time, shutdown_windows):
+        first_window = shutdown_windows[0]
+        shutdown_windows = list(shutdown_windows[1:])
+        self._next_opening = start_time + (60 * first_window)
+        if len(shutdown_windows) % 2:
+            shutdown_windows.append(first_window)
+        else:
+            shutdown_windows[-1] += first_window
+        self.shutdown_windows = itertools.cycle([60 * n
+                                                 for n in shutdown_windows])
+        self._open_start = self._next_opening
+        self._open_for = next(self.shutdown_windows)
+
+    def _advance_opening(self):
+        while self._next_opening < time.time():
+            self._open_start = self._next_opening
+            self._next_opening += self._open_for + next(self.shutdown_windows)
+            self._open_for = next(self.shutdown_windows)
+
+    def next_opening(self):
+        self._advance_opening()
+        return self._next_opening
+
+    def window_open(self):
+        self._advance_opening()
+        return 0 < (time.time() - self._open_start) < self._open_for
+
+
+class ComputeNodeActor(config.actor_class):
+    def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
+                 timer_actor, arvados_node=None,
+                 poll_stale_after=600, node_stale_after=3600):
+        super(ComputeNodeActor, self).__init__()
+        self._later = self.actor_ref.proxy()
+        self._logger = logging.getLogger('arvnodeman.computenode')
+        self._last_log = None
+        self._shutdowns = shutdown_timer
+        self._timer = timer_actor
+        self.cloud_node = cloud_node
+        self.cloud_node_start_time = cloud_node_start_time
+        self.arvados_node = arvados_node
+        self.poll_stale_after = poll_stale_after
+        self.node_stale_after = node_stale_after
+        self.subscribers = set()
+        self.last_shutdown_opening = None
+        self._later.consider_shutdown()
+
+    def subscribe(self, subscriber):
+        self.subscribers.add(subscriber)
+
+    def _debug(self, msg, *args):
+        if msg == self._last_log:
+            return
+        self._last_log = msg
+        self._logger.debug(msg, *args)
+
+    def _shutdown_eligible(self):
+        if self.arvados_node is None:
+            return timestamp_fresh(self.cloud_node_start_time,
+                                   self.node_stale_after)
+        else:
+            return (timestamp_fresh(arvados_node_mtime(self.arvados_node),
+                                    self.poll_stale_after) and
+                    (self.arvados_node['info'].get('slurm_state') == 'idle'))
+
+    def consider_shutdown(self):
+        next_opening = self._shutdowns.next_opening()
+        if self._shutdowns.window_open():
+            if self._shutdown_eligible():
+                self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
+                _notify_subscribers(self._later, self.subscribers)
+            else:
+                self._debug("Node %s shutdown window open but node busy.",
+                            self.cloud_node.id)
+        else:
+            self._debug("Node %s shutdown window closed.  Next at %s.",
+                        self.cloud_node.id, time.ctime(next_opening))
+        if self.last_shutdown_opening != next_opening:
+            self._timer.schedule(next_opening, self._later.consider_shutdown)
+            self.last_shutdown_opening = next_opening
+
+    def offer_arvados_pair(self, arvados_node):
+        if self.arvados_node is not None:
+            return None
+        elif arvados_node['ip_address'] in self.cloud_node.private_ips:
+            self.arvados_node = arvados_node
+            return self.cloud_node.id
+        else:
+            return None
+
+    def update_cloud_node(self, cloud_node):
+        if cloud_node is not None:
+            self.cloud_node = cloud_node
+            self._later.consider_shutdown()
+
+    def update_arvados_node(self, arvados_node):
+        if arvados_node is not None:
+            self.arvados_node = arvados_node
+            self._later.consider_shutdown()
diff --git a/services/nodemanager/arvnodeman/computenode/dummy.py b/services/nodemanager/arvnodeman/computenode/dummy.py
new file mode 100644
index 0000000..3a05891
--- /dev/null
+++ b/services/nodemanager/arvnodeman/computenode/dummy.py
@@ -0,0 +1,42 @@
+#!/usr/bin/env python
+
+import time
+
+import libcloud.compute.providers as cloud_provider
+import libcloud.compute.types as cloud_types
+
+from . import BaseComputeNodeDriver
+
+class ComputeNodeDriver(BaseComputeNodeDriver):
+    DEFAULT_DRIVER = cloud_provider.get_driver(cloud_types.Provider.DUMMY)
+    DEFAULT_REAL = DEFAULT_DRIVER('ComputeNodeDriver')
+    DUMMY_START_TIME = time.time()
+
+    def __init__(self, auth_kwargs, list_kwargs, create_kwargs,
+                 driver_class=DEFAULT_DRIVER):
+        super(ComputeNodeDriver, self).__init__(
+            auth_kwargs, list_kwargs, create_kwargs, driver_class)
+        if driver_class is self.DEFAULT_DRIVER:
+            self.real = self.DEFAULT_REAL
+
+    def _ensure_private_ip(self, node):
+        if not node.private_ips:
+            node.private_ips = ['10.10.0.{}'.format(node.id)]
+
+    def arvados_create_kwargs(self, arvados_node):
+        return {}
+
+    def list_nodes(self):
+        nodelist = super(ComputeNodeDriver, self).list_nodes()
+        for node in nodelist:
+            self._ensure_private_ip(node)
+        return nodelist
+
+    def create_node(self, size, arvados_node):
+        node = super(ComputeNodeDriver, self).create_node(size, arvados_node)
+        self._ensure_private_ip(node)
+        return node
+
+    @classmethod
+    def node_start_time(cls, node):
+        return cls.DUMMY_START_TIME
diff --git a/services/nodemanager/arvnodeman/computenode/ec2.py b/services/nodemanager/arvnodeman/computenode/ec2.py
new file mode 100644
index 0000000..9f6bea7
--- /dev/null
+++ b/services/nodemanager/arvnodeman/computenode/ec2.py
@@ -0,0 +1,88 @@
+#!/usr/bin/env python
+
+import time
+
+import libcloud.compute.base as cloud_base
+import libcloud.compute.providers as cloud_provider
+import libcloud.compute.types as cloud_types
+from libcloud.compute.drivers import ec2 as cloud_ec2
+
+from . import BaseComputeNodeDriver
+
+### Monkeypatch libcloud to support AWS' new SecurityGroup API.
+class ANMEC2Connection(cloud_ec2.EC2Connection):
+    def request(self, *args, **kwargs):
+        params = kwargs.get('params')
+        if (params is not None) and (params.get('Action') == 'RunInstances'):
+            for key in params.keys():
+                if key.startswith('SecurityGroup.'):
+                    new_key = key.replace('Group.', 'GroupId.', 1)
+                    params[new_key] = params.pop(key).id
+            kwargs['params'] = params
+        return super(ANMEC2Connection, self).request(*args, **kwargs)
+
+
+class ANMEC2NodeDriver(cloud_ec2.EC2NodeDriver):
+    connectionCls = ANMEC2Connection
+
+
+class ComputeNodeDriver(BaseComputeNodeDriver):
+    DEFAULT_DRIVER = ANMEC2NodeDriver
+### End monkeypatch
+    SEARCH_CACHE = {}
+
+    def __init__(self, auth_kwargs, list_kwargs, create_kwargs,
+                 driver_class=DEFAULT_DRIVER):
+        # We need full lists of keys up front because these loops modify
+        # dictionaries in-place.
+        for key in list_kwargs.keys():
+            list_kwargs[key.replace('_', ':')] = list_kwargs.pop(key)
+        self.tags = {key[4:]: value
+                     for key, value in list_kwargs.iteritems()
+                     if key.startswith('tag:')}
+        super(ComputeNodeDriver, self).__init__(
+            auth_kwargs, {'ex_filters': list_kwargs}, create_kwargs,
+            driver_class)
+        for key in self.create_kwargs.keys():
+            init_method = getattr(self, '_init_' + key, None)
+            if init_method is not None:
+                new_pair = init_method(self.create_kwargs.pop(key))
+                if new_pair is not None:
+                    self.create_kwargs[new_pair[0]] = new_pair[1]
+
+    def _init_image_id(self, image_id):
+        return 'image', self.search_for(image_id, 'list_images')
+
+    def _init_ping_host(self, ping_host):
+        self.ping_host = ping_host
+
+    def _init_security_groups(self, group_names):
+        return 'ex_security_groups', [
+            self.search_for(gname.strip(), 'ex_get_security_groups')
+            for gname in group_names.split(',')]
+
+    def _init_subnet_id(self, subnet_id):
+        return 'ex_subnet', self.search_for(subnet_id, 'ex_list_subnets')
+
+    def _init_ssh_key(self, filename):
+        with open(filename) as ssh_file:
+            key = cloud_base.NodeAuthSSHKey(ssh_file.read())
+        return 'auth', key
+
+    def arvados_create_kwargs(self, arvados_node):
+        result = {'ex_metadata': self.tags.copy(),
+                  'name': '{}.{}'.format(arvados_node['hostname'],
+                                         arvados_node['domain'])}
+        ping_secret = arvados_node['info'].get('ping_secret')
+        if ping_secret is not None:
+            ping_url = ('https://{}/arvados/v1/nodes/{}/ping?ping_secret={}'.
+                        format(self.ping_host, arvados_node['uuid'],
+                               ping_secret))
+            result['ex_userdata'] = ping_url
+        return result
+
+    @classmethod
+    def node_start_time(cls, node):
+        time_str = node.extra['launch_time'].split('.', 2)[0] + 'UTC'
+        return time.mktime(time.strptime(
+                time_str,'%Y-%m-%dT%H:%M:%S%Z')) - time.timezone
diff --git a/services/nodemanager/arvnodeman/config.py b/services/nodemanager/arvnodeman/config.py
new file mode 100644
index 0000000..2cfaaee
--- /dev/null
+++ b/services/nodemanager/arvnodeman/config.py
@@ -0,0 +1,97 @@
+#!/usr/bin/env python
+
+import ConfigParser
+import importlib
+import logging
+import ssl
+
+import apiclient.errors as apierror
+import arvados
+import httplib2
+import libcloud.common.types as cloud_types
+import pykka
+
+# IOError is the base class for socket.error and friends.
+# It seems like it hits the sweet spot for operations we want to retry:
+# it's low-level, but unlikely to catch code bugs.
+NETWORK_ERRORS = (IOError, ssl.SSLError)
+ARVADOS_ERRORS = NETWORK_ERRORS + (apierror.Error,)
+CLOUD_ERRORS = NETWORK_ERRORS + (cloud_types.LibcloudError,)
+
+actor_class = pykka.ThreadingActor
+
+class NodeManagerConfig(ConfigParser.SafeConfigParser):
+    LOGGING_NONLEVELS = frozenset(['file'])
+
+    def __init__(self, *args, **kwargs):
+        # Can't use super() because SafeConfigParser is an old-style class.
+        ConfigParser.SafeConfigParser.__init__(self, *args, **kwargs)
+        for sec_name, settings in {
+            'Arvados': {'insecure': 'no',
+                        'timeout': '15'},
+            'Daemon': {'max_nodes': '1',
+                       'poll_time': '60',
+                       'max_poll_time': '300',
+                       'poll_stale_after': '600',
+                       'node_stale_after': str(60 * 60 * 2)},
+            'Logging': {'file': '/dev/stderr',
+                        'level': 'WARNING'},
+        }.iteritems():
+            if not self.has_section(sec_name):
+                self.add_section(sec_name)
+            for opt_name, value in settings.iteritems():
+                if not self.has_option(sec_name, opt_name):
+                    self.set(sec_name, opt_name, value)
+
+    def get_section(self, section, transformer=None):
+        result = self._dict()
+        for key, value in self.items(section):
+            if transformer is not None:
+                try:
+                    value = transformer(value)
+                except (TypeError, ValueError):
+                    pass
+            result[key] = value
+        return result
+
+    def log_levels(self):
+        return {key: getattr(logging, self.get('Logging', key).upper())
+                for key in self.options('Logging')
+                if key not in self.LOGGING_NONLEVELS}
+
+    def new_arvados_client(self):
+        if self.has_option('Daemon', 'certs_file'):
+            certs_file = self.get('Daemon', 'certs_file')
+        else:
+            certs_file = None
+        insecure = self.getboolean('Arvados', 'insecure')
+        http = httplib2.Http(timeout=self.getint('Arvados', 'timeout'),
+                             ca_certs=certs_file,
+                             disable_ssl_certificate_validation=insecure)
+        return arvados.api('v1',
+                           cache=False,  # Don't reuse an existing client.
+                           host=self.get('Arvados', 'host'),
+                           token=self.get('Arvados', 'token'),
+                           insecure=insecure,
+                           http=http)
+
+    def new_cloud_client(self):
+        module = importlib.import_module('arvnodeman.computenode.' +
+                                         self.get('Cloud', 'provider'))
+        return module.ComputeNodeDriver(self.get_section('Cloud Credentials'),
+                                        self.get_section('Cloud List'),
+                                        self.get_section('Cloud Create'))
+
+    def node_sizes(self, all_sizes):
+        size_kwargs = {}
+        for sec_name in self.sections():
+            sec_words = sec_name.split(None, 2)
+            if sec_words[0] != 'Size':
+                continue
+            size_kwargs[sec_words[1]] = self.get_section(sec_name, int)
+        return [(size, size_kwargs[size.id]) for size in all_sizes
+                if size.id in size_kwargs]
+
+    def shutdown_windows(self):
+        return [int(n)
+                for n in self.get('Cloud', 'shutdown_windows').split(',')]
diff --git a/services/nodemanager/arvnodeman/daemon.py b/services/nodemanager/arvnodeman/daemon.py
new file mode 100644
index 0000000..8d8f107
--- /dev/null
+++ b/services/nodemanager/arvnodeman/daemon.py
@@ -0,0 +1,263 @@
+#!/usr/bin/env python
+
+import functools
+import logging
+import time
+
+import pykka
+
+from . import computenode as cnode
+from .config import actor_class
+
+class NodeManagerDaemonActor(actor_class):
+    class PairingTracker(object):
+        def __init__(self, key_func, paired_items, unpaired_items):
+            self.key_func = key_func
+            self._paired_items = paired_items
+            self._unpaired_items = unpaired_items
+
+        def all_items(self, response):
+            self.unseen = set(self._paired_items.iterkeys())
+            self.unseen.update(self._unpaired_items.iterkeys())
+            for item in response:
+                key = self.key_func(item)
+                yield key, item
+                if key in self.unseen:
+                    self.unseen.remove(key)
+
+        def new_items(self, response):
+            for key, item in self.all_items(response):
+                if key not in self.unseen:
+                    yield key, item
+
+        def unpaired_items(self, response):
+            for key, item in self.all_items(response):
+                if key not in self._paired_items:
+                    yield key, item
+
+        def unseen_items(self):
+            for key in self.unseen:
+                if key in self._paired_items:
+                    home_dict = self._paired_items
+                else:
+                    home_dict = self._unpaired_items
+                yield home_dict, key
+
+
+    def __init__(self, server_wishlist_actor, arvados_nodes_actor,
+                 cloud_nodes_actor, timer_actor,
+                 arvados_factory, cloud_factory,
+                 shutdown_windows, max_nodes,
+                 poll_stale_after=600, node_stale_after=7200,
+                 node_setup_class=cnode.ComputeNodeSetupActor,
+                 node_shutdown_class=cnode.ComputeNodeShutdownActor,
+                 node_actor_class=cnode.ComputeNodeActor):
+        super(NodeManagerDaemonActor, self).__init__()
+        self._node_setup = node_setup_class
+        self._node_shutdown = node_shutdown_class
+        self._node_actor = node_actor_class
+        self._timer = timer_actor
+        self._new_arvados = arvados_factory
+        self._new_cloud = cloud_factory
+        self._cloud_driver = self._new_cloud()
+        self._logger = logging.getLogger('arvnodeman.daemon')
+        self._later = self.actor_ref.proxy()
+        self.shutdown_windows = shutdown_windows
+        self.max_nodes = max_nodes
+        self.poll_stale_after = poll_stale_after
+        self.node_stale_after = node_stale_after
+        self.last_polls = {}
+        for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
+            poll_actor = locals()[poll_name + '_actor']
+            poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
+            setattr(self, '_{}_actor'.format(poll_name), poll_actor)
+            self.last_polls[poll_name] = -self.poll_stale_after
+        # Map cloud node IDs, or Arvados node UUIDs, to their ComputeNodeActors.
+        self.unpaired_clouds = {}
+        self.paired_clouds = {}
+        self.paired_arv = {}
+        self.unpaired_arv = {}  # Arvados node UUIDs to full node data
+        self.assigned_arv = {}  # Arvados node UUIDs to assignment timestamps
+        self.booting = {}       # Actor IDs to ComputeNodeSetupActors
+        self.shutdowns = {}     # Cloud node IDs to ComputeNodeShutdownActors
+        self._logger.debug("Daemon initialized")
+
+    def _update_poll_time(self, poll_key):
+        self.last_polls[poll_key] = time.time()
+
+    def _pair_nodes(self, cloud_key, arv_key, actor=None):
+        if actor is None:
+            actor = self.unpaired_clouds[cloud_key]
+        self._logger.info("Cloud node %s has associated with Arvados node %s",
+                          cloud_key, arv_key)
+        self.paired_clouds[cloud_key] = actor
+        self.paired_arv[arv_key] = actor
+        self._arvados_nodes_actor.subscribe_to(arv_key,
+                                               actor.update_arvados_node)
+        self.unpaired_clouds.pop(cloud_key, None)
+        self.unpaired_arv.pop(arv_key, None)
+
+    def _new_node(self, cloud_node, arvados_node=None):
+        start_time = self._cloud_driver.node_start_time(cloud_node)
+        shutdown_timer = cnode.ShutdownTimer(start_time,
+                                             self.shutdown_windows)
+        actor = self._node_actor.start(
+            cloud_node=cloud_node,
+            cloud_node_start_time=start_time,
+            shutdown_timer=shutdown_timer,
+            timer_actor=self._timer,
+            arvados_node=arvados_node,
+            poll_stale_after=self.poll_stale_after,
+            node_stale_after=self.node_stale_after).proxy()
+        actor.subscribe(self._later.shutdown_offer)
+        self._cloud_nodes_actor.subscribe_to(cloud_node.id,
+                                             actor.update_cloud_node)
+        if arvados_node is not None:
+            self._pair_nodes(cloud_node.id, arvados_node['uuid'], actor)
+        return actor
+
+    def update_cloud_nodes(self, nodelist):
+        self._update_poll_time('cloud_nodes')
+        pairs = self.PairingTracker(lambda n: n.id,
+                                    self.paired_clouds, self.unpaired_clouds)
+        for key, node in pairs.new_items(nodelist):
+            actor = self._new_node(node)
+            for arv_key, arv_node in self.unpaired_arv.iteritems():
+                if actor.offer_arvados_pair(arv_node).get():
+                    self._pair_nodes(key, arv_key, actor)
+                    break
+            else:
+                self._logger.info("Registering new cloud node %s", key)
+                self.unpaired_clouds[key] = actor
+        for source, key in pairs.unseen_items():
+            source.pop(key).stop()
+            if key in self.shutdowns:
+                self.shutdowns.pop(key).stop()
+
+    def update_arvados_nodes(self, nodelist):
+        self._update_poll_time('arvados_nodes')
+        pairs = self.PairingTracker(lambda n: n['uuid'],
+                                    self.paired_arv, self.unpaired_arv)
+        for key, node in pairs.unpaired_items(nodelist):
+            if key not in self.unpaired_arv:
+                self._logger.info("Registering new Arvados node %s", key)
+            self.unpaired_arv[key] = node
+            for cloud_key, actor in self.unpaired_clouds.iteritems():
+                if actor.offer_arvados_pair(node).get():
+                    self._pair_nodes(cloud_key, key, actor)
+                    break
+        for source, key in pairs.unseen_items():
+            if source is self.unpaired_arv:
+                del self.unpaired_arv[key]
+
+    def _node_count(self):
+        up = sum(len(nodelist) for nodelist in
+                 [self.paired_clouds, self.unpaired_clouds, self.booting])
+        return up - len(self.shutdowns)
+
+    def _nodes_wanted(self):
+        return len(self.last_wishlist) - self._node_count()
+
+    def _nodes_excess(self):
+        return -self._nodes_wanted()
+
+    def update_server_wishlist(self, wishlist):
+        self._update_poll_time('server_wishlist')
+        self.last_wishlist = wishlist[:self.max_nodes]
+        nodes_wanted = self._nodes_wanted()
+        if nodes_wanted > 0:
+            self._later.start_node()
+        elif (nodes_wanted < 0) and self.booting:
+            self._later.stop_booting_node()
+
+    def _check_poll_freshness(orig_func):
+        @functools.wraps(orig_func)
+        def wrapper(self, *args, **kwargs):
+            now = time.time()
+            if all(now - t < self.poll_stale_after
+                   for t in self.last_polls.itervalues()):
+                return orig_func(self, *args, **kwargs)
+            else:
+                return None
+        return wrapper
+
+    def _find_reusable_arvados_node(self):
+        for node in self.unpaired_arv.itervalues():
+            assigned_at = self.assigned_arv.get(node['uuid'],
+                                                -self.node_stale_after)
+            if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
+                                          self.node_stale_after) and
+                not cnode.timestamp_fresh(assigned_at,
+                                          self.node_stale_after)):
+                return node
+        return None
+
+    @_check_poll_freshness
+    def start_node(self):
+        nodes_wanted = self._nodes_wanted()
+        if nodes_wanted < 1:
+            return None
+        arvados_node = self._find_reusable_arvados_node()
+        size = self.last_wishlist[nodes_wanted - 1]
+        self._logger.info("Want %s more nodes.  Booting a %s node.",
+                          nodes_wanted, size.name)
+        new_setup = self._node_setup.start(
+            timer_actor=self._timer,
+            arvados_client=self._new_arvados(),
+            arvados_node=arvados_node,
+            cloud_client=self._new_cloud(),
+            cloud_size=size).proxy()
+        self.booting[new_setup.actor_ref.actor_urn] = new_setup
+        if arvados_node is not None:
+            self.assigned_arv[arvados_node['uuid']] = time.time()
+        new_setup.subscribe(self._later.node_up)
+        if nodes_wanted > 1:
+            self._later.start_node()
+
+    def _actor_nodes(self, node_actor):
+        return pykka.get_all([node_actor.cloud_node, node_actor.arvados_node])
+
+    def node_up(self, setup_proxy):
+        cloud_node, arvados_node = self._actor_nodes(setup_proxy)
+        self._new_node(cloud_node, arvados_node)
+        del self.booting[setup_proxy.actor_ref.actor_urn]
+        self.assigned_arv.pop(arvados_node['uuid'], None)
+        setup_proxy.stop()
+
+    @_check_poll_freshness
+    def stop_booting_node(self):
+        nodes_excess = self._nodes_excess()
+        if (nodes_excess < 1) or not self.booting:
+            return None
+        for key, node in self.booting.iteritems():
+            node.stop_if_no_cloud_node().get()
+            if not node.actor_ref.is_alive():
+                del self.booting[key]
+                if nodes_excess > 1:
+                    self._later.stop_booting_node()
+                break
+
+    @_check_poll_freshness
+    def shutdown_offer(self, node_actor):
+        if self._nodes_excess() < 1:
+            return None
+        cloud_node, arvados_node = self._actor_nodes(node_actor)
+        if cloud_node.id in self.shutdowns:
+            return None
+        shutdown = self._node_shutdown.start(timer_actor=self._timer,
+                                             cloud_client=self._new_cloud(),
+                                             cloud_node=cloud_node).proxy()
+        self.shutdowns[cloud_node.id] = shutdown
+
+    def shutdown(self):
+        self._logger.info("Shutting down after signal.")
+        self.poll_stale_after = -1  # Inhibit starting/stopping nodes
+        for bootnode in self.booting.itervalues():
+            bootnode.stop_if_no_cloud_node()
+        self._later.await_shutdown()
+
+    def await_shutdown(self):
+        if any(node.actor_ref.is_alive() for node in self.booting.itervalues()):
+            self._timer.schedule(time.time() + 1, self._later.await_shutdown)
+        else:
+            self.stop()
diff --git a/services/nodemanager/arvnodeman/jobqueue.py b/services/nodemanager/arvnodeman/jobqueue.py
new file mode 100644
index 0000000..9daf556
--- /dev/null
+++ b/services/nodemanager/arvnodeman/jobqueue.py
@@ -0,0 +1,78 @@
+#!/usr/bin/env python
+
+import arvados.errors as arverror
+
+from . import clientactor
+
+class ServerCalculator(object):
+    class SizeWrapper(object):
+        def __init__(self, real_size, **kwargs):
+            self.real = real_size
+            for name in ['id', 'name', 'ram', 'disk', 'bandwidth', 'price',
+                         'extra']:
+                setattr(self, name, getattr(self.real, name))
+            self.cores = kwargs.pop('cores')
+            self.scratch = self.disk
+            for name, override in kwargs.iteritems():
+                if not hasattr(self, name):
+                    raise ValueError("unrecognized size field '%s'" % (name,))
+                setattr(self, name, override)
+
+        def meets_constraints(self, **kwargs):
+            for name, want_value in kwargs.iteritems():
+                have_value = getattr(self, name)
+                if (have_value != 0) and (have_value < want_value):
+                    return False
+            return True
+
+
+    def __init__(self, server_list, max_nodes=None):
+        self.sizes = [self.SizeWrapper(s, **kws) for s, kws in server_list]
+        self.sizes.sort(key=lambda s: s.price)
+        self.max_nodes = max_nodes or float("inf")
+
+    @staticmethod
+    def coerce_int(x, fallback):
+        try:
+            return int(x)
+        except (TypeError, ValueError):
+            return fallback
+
+    def size_for_constraints(self, constraints):
+        want_value = lambda key: self.coerce_int(constraints.get(key), 0)
+        wants = {'cores': want_value('min_cores_per_node'),
+                 'ram': want_value('min_ram_mb_per_node'),
+                 'scratch': want_value('min_scratch_mb_per_node')}
+        for size in self.sizes:
+            if size.meets_constraints(**wants):
+                return size
+        return None
+
+    def servers_for_queue(self, queue):
+        servers = []
+        for job in queue:
+            constraints = job['runtime_constraints']
+            want_count = self.coerce_int(constraints.get('min_nodes'), 1)
+            size = self.size_for_constraints(constraints)
+            if (want_count < self.max_nodes) and (size is not None):
+                servers.extend([size.real] * max(1, want_count))
+        return servers
+
+
+class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
+    CLIENT_ERRORS = (arverror.ApiError,)
+    LOGGER_NAME = 'arvnodeman.jobqueue'
+
+    def __init__(self, client, timer_actor, server_calc, *args, **kwargs):
+        super(JobQueueMonitorActor, self).__init__(
+            client, timer_actor, *args, **kwargs)
+        self._calculator = server_calc
+
+    def _send_request(self):
+        return self._client.jobs().queue().execute()['items']
+
+    def _got_response(self, queue):
+        server_list = self._calculator.servers_for_queue(queue)
+        self._logger.debug("Sending server wishlist: %s",
+                           ', '.join(s.name for s in server_list))
+        return super(JobQueueMonitorActor, self)._got_response(server_list)
diff --git a/services/nodemanager/arvnodeman/launcher.py b/services/nodemanager/arvnodeman/launcher.py
new file mode 100644
index 0000000..7539a9b
--- /dev/null
+++ b/services/nodemanager/arvnodeman/launcher.py
@@ -0,0 +1,120 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+import argparse
+import logging
+import signal
+import sys
+import time
+
+import daemon
+import pykka
+
+from . import *  # This imports all the Actor classes.
+from . import config as nmconfig
+
+node_daemon = None
+
+def abort(msg, code=1):
+    print("arvados-node-manager: " + msg)
+    sys.exit(code)
+
+def parse_cli(args):
+    parser = argparse.ArgumentParser(
+        prog='arvados-node-manager',
+        description="Dynamically allocate Arvados cloud compute nodes")
+    parser.add_argument(
+        '--foreground', action='store_true', default=False,
+        help="Run in the foreground.  Don't daemonize.")
+    parser.add_argument(
+        '--config', help="Path to configuration file")
+    return parser.parse_args(args)
+
+def load_config(path):
+    if not path:
+        abort("No --config file specified", 2)
+    config = nmconfig.NodeManagerConfig()
+    try:
+        with open(path) as config_file:
+            config.readfp(config_file)
+    except (IOError, OSError) as error:
+        abort("Error reading configuration file {}: {}".format(path, error))
+    return config
+
+def setup_logging(path, level, **sublevels):
+    handler = logging.FileHandler(path)
+    handler.setFormatter(logging.Formatter(
+            '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
+            '%Y-%m-%d %H:%M:%S'))
+    root_logger = logging.getLogger()
+    root_logger.addHandler(handler)
+    root_logger.setLevel(level)
+    for logger_name, sublevel in sublevels.iteritems():
+        sublogger = logging.getLogger(logger_name)
+        sublogger.setLevel(sublevel)
+
+def launch_pollers(config):
+    cloud_client = config.new_cloud_client()
+    arvados_client = config.new_arvados_client()
+    size_list = config.node_sizes(cloud_client.list_sizes())
+    if not size_list:
+        abort("No valid sizes configured")
+
+    server_calculator = ServerCalculator(
+        size_list, config.getint('Daemon', 'max_nodes'))
+    poll_time = config.getint('Daemon', 'poll_time')
+    max_poll_time = config.getint('Daemon', 'max_poll_time')
+
+    timer = TimedCallBackActor.start(poll_time / 10.0).proxy()
+    cloud_node_poller = CloudNodeListMonitorActor.start(
+        cloud_client, timer, poll_time, max_poll_time).proxy()
+    arvados_node_poller = ArvadosNodeListMonitorActor.start(
+        arvados_client, timer, poll_time, max_poll_time).proxy()
+    job_queue_poller = JobQueueMonitorActor.start(
+        config.new_arvados_client(), timer, server_calculator,
+        poll_time, max_poll_time).proxy()
+    return timer, cloud_node_poller, arvados_node_poller, job_queue_poller
+
+_caught_signals = {}
+def shutdown_signal(signal_code, frame):
+    current_count = _caught_signals.get(signal_code, 0)
+    _caught_signals[signal_code] = current_count + 1
+    if node_daemon is None:
+        sys.exit(-signal_code)
+    elif current_count == 0:
+        node_daemon.shutdown()
+    elif current_count == 1:
+        pykka.ActorRegistry.stop_all()
+    else:
+        sys.exit(-signal_code)
+
+def main(args=None):
+    global node_daemon
+    args = parse_cli(args)
+    config = load_config(args.config)
+
+    if not args.foreground:
+        daemon.DaemonContext().open()
+    for sigcode in [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]:
+        signal.signal(sigcode, shutdown_signal)
+
+    setup_logging(config.get('Logging', 'file'), **config.log_levels())
+    timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
+        launch_pollers(config)
+    node_daemon = NodeManagerDaemonActor.start(
+        job_queue_poller, arvados_node_poller, cloud_node_poller, timer,
+        config.new_arvados_client, config.new_cloud_client,
+        config.shutdown_windows(), config.getint('Daemon', 'max_nodes'),
+        config.getint('Daemon', 'poll_stale_after'),
+        config.getint('Daemon', 'node_stale_after')).proxy()
+
+    signal.pause()
+    daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
+    while not daemon_stopped():
+        time.sleep(1)
+    pykka.ActorRegistry.stop_all()
+
+
+if __name__ == '__main__':
+    main()
diff --git a/services/nodemanager/arvnodeman/nodelist.py b/services/nodemanager/arvnodeman/nodelist.py
new file mode 100644
index 0000000..dd15dc2
--- /dev/null
+++ b/services/nodemanager/arvnodeman/nodelist.py
@@ -0,0 +1,27 @@
+#!/usr/bin/env python
+
+import arvados.errors as arverror
+
+from . import clientactor
+from . import config
+
+class ArvadosNodeListMonitorActor(clientactor.RemotePollLoopActor):
+    CLIENT_ERRORS = config.ARVADOS_ERRORS
+    LOGGER_NAME = 'arvnodeman.arvados_nodes'
+
+    def _item_key(self, node):
+        return node['uuid']
+
+    def _send_request(self):
+        return self._client.nodes().list(limit=10000).execute()['items']
+
+
+class CloudNodeListMonitorActor(clientactor.RemotePollLoopActor):
+    CLIENT_ERRORS = config.CLOUD_ERRORS
+    LOGGER_NAME = 'arvnodeman.cloud_nodes'
+
+    def _item_key(self, node):
+        return node.id
+
+    def _send_request(self):
+        return self._client.list_nodes()
diff --git a/services/nodemanager/arvnodeman/timedcallback.py b/services/nodemanager/arvnodeman/timedcallback.py
new file mode 100644
index 0000000..ee65a71
--- /dev/null
+++ b/services/nodemanager/arvnodeman/timedcallback.py
@@ -0,0 +1,33 @@
+#!/usr/bin/env python
+
+import heapq
+import time
+
+import pykka
+
+from .config import actor_class
+
+class TimedCallBackActor(actor_class):
+    def __init__(self, max_sleep=1):
+        super(TimedCallBackActor, self).__init__()
+        self._proxy = self.actor_ref.proxy()
+        self.messages = []
+        self.max_sleep = max_sleep
+
+    def schedule(self, delivery_time, receiver, *args, **kwargs):
+        heapq.heappush(self.messages, (delivery_time, receiver, args, kwargs))
+        self._proxy.deliver()
+
+    def deliver(self):
+        if not self.messages:
+            return None
+        til_next = self.messages[0][0] - time.time()
+        if til_next < 0:
+            t, receiver, args, kwargs = heapq.heappop(self.messages)
+            try:
+                receiver(*args, **kwargs)
+            except pykka.ActorDeadError:
+                pass
+        else:
+            time.sleep(min(til_next, self.max_sleep))
+        self._proxy.deliver()
diff --git a/services/nodemanager/bin/arvados-node-manager b/services/nodemanager/bin/arvados-node-manager
new file mode 100644
index 0000000..fba3453
--- /dev/null
+++ b/services/nodemanager/bin/arvados-node-manager
@@ -0,0 +1,4 @@
+#!/usr/bin/env python
+
+from arvnodeman.launcher import main
+main()
diff --git a/services/nodemanager/doc/ec2.example.cfg b/services/nodemanager/doc/ec2.example.cfg
new file mode 100644
index 0000000..b2b2e2d
--- /dev/null
+++ b/services/nodemanager/doc/ec2.example.cfg
@@ -0,0 +1,119 @@
+# EC2 configuration for Arvados Node Manager.
+# All times are in seconds unless specified otherwise.
+
+[Daemon]
+# Node Manager will not start any compute nodes when at least this
+# many are running.
+max_nodes = 8
+
+# Poll EC2 nodes and Arvados for new information every N seconds.
+poll_time = 60
+
+# Polls have exponential backoff when services fail to respond.
+# This is the longest time to wait between polls.
+max_poll_time = 300
+
+# If Node Manager can't succesfully poll a service for this long,
+# it will never start or stop compute nodes, on the assumption that its
+# information is too outdated.
+poll_stale_after = 600
+
+# "Node stale time" affects two related behaviors.
+# 1. If a compute node fails to ping an Arvados node for this long,
+# assume that it failed to bootstrap correctly, and consider it eligible
+# for shutdown.
+# This setting is only considered when the compute node is in a normal
+# shutdown window (see below).  You probably want to set this so that a
+# shutdown window opens just after time expires.
+# 2. When the Node Manager starts a new compute node, it will try to reuse
+# an Arvados node that hasn't been updated for this long.
+node_stale_after = 3000
+
+# File path for Certificate Authorities
+certs_file = /etc/ssl/certs/ca-certificates.crt
+
+[Logging]
+# Log file path
+file = /var/log/arvados/node-manager.log
+
+# Log level for most Node Manager messages.
+# Choose one of DEBUG, INFO, WARNING, ERROR, or CRITICAL.
+# WARNING lets you know when polling a service fails.
+# INFO additionally lets you know when a compute node is started or stopped.
+level = INFO
+
+# You can also set different log levels for specific libraries.
+# Pykka is the Node Manager's actor library.
+# Setting this to DEBUG will display tracebacks for uncaught
+# exceptions in the actors, but it's also very chatty.
+pykka = WARNING
+
+# Setting apiclient to INFO will log the URL of every Arvados API request.
+apiclient = WARNING
+
+[Arvados]
+host = zyxwv.arvadosapi.com
+token = ARVADOS_TOKEN
+timeout = 15
+
+# Accept an untrusted SSL certificate from the API server?
+insecure = no
+
+[Cloud]
+provider = ec2
+
+# It's usually most cost-effective to shut down compute nodes during narrow
+# windows of time.  For example, EC2 bills each node by the hour, so the best
+# time to shut down a node is right before a new hour of uptime starts.
+# Shutdown windows define these periods of time.  These are windows in
+# full minutes, separated by commas.  Counting from the time the node is
+# booted, the node WILL NOT shut down for N1 minutes; then it MAY shut down
+# for N2 minutes; then it WILL NOT shut down for N3 minutes; and so on.
+# For example, "54, 5, 1" means the node may shut down from the 54th to the
+# 59th minute of each hour of uptime.
+# Specify at least two windows.  You can add as many as you need beyond that.
+shutdown_windows = 54, 5, 1
+
+[Cloud Credentials]
+key = KEY
+secret = SECRET_KEY
+region = us-east-1
+
+[Cloud List]
+# This section defines filters that find compute nodes.
+# Tags that you specify here will automatically be added to nodes you create.
+# Replace colons in Amazon filters with underscores
+# (e.g., write "tag:mytag" as "tag_mytag").
+instance-state-name = running
+tag_arvados-class = dynamic-compute
+tag_cluster = zyxwv
+
+[Cloud Create]
+ex_keyname = string
+
+# New compute nodes will send pings to Arvados at this host.
+# You may specify a port, and use brackets to disambiguate IPv6 addresses.
+ping_host = hostname:port
+
+# File path for an SSH key that can log in to the compute node.
+ssh_key = filename
+
+# The EC2 IDs of the image and subnet compute nodes should use.
+image_id = string
+subnet_id = string
+
+# Comma-separated EC2 IDs for the security group(s) assigned to each
+# compute node.
+security_groups = sg1, sg2
+
+[Size t2.medium]
+# You can define any number of Size sections to list EC2 sizes you're
+# willing to use.  The Node Manager should boot the cheapest size(s) that
+# can run jobs in the queue (N.B.: defining more than one size has not been
+# tested yet).
+# Each size section MUST define the number of cores it has.  You may also
+# want to define the number of mebibytes of scratch space for Crunch jobs.
+# You can also override Amazon's provided data fields by setting the same
+# names here.
+cores = 2
+scratch = 100
\ No newline at end of file
diff --git a/services/nodemanager/doc/local.example.cfg b/services/nodemanager/doc/local.example.cfg
new file mode 100644
index 0000000..33ca3d6
--- /dev/null
+++ b/services/nodemanager/doc/local.example.cfg
@@ -0,0 +1,41 @@
+# You can use this configuration to run a development Node Manager for
+# testing.  It uses libcloud's dummy driver and your own development API server.
+# When new cloud nodes are created, you'll need to simulate the ping that
+# they send to the Arvados API server.  The easiest way I've found to do that
+# is through the API server Rails console: load the Node object, set its
+# IP address to 10.10.0.N (where N is the cloud node's ID), and save.
+
+[Daemon]
+max_nodes = 8
+poll_time = 15
+max_poll_time = 60
+poll_stale_after = 600
+node_stale_after = 300
+certs_file = /etc/ssl/certs/ca-certificates.crt
+
+[Logging]
+level = DEBUG
+pykka = DEBUG
+apiclient = WARNING
+
+[Arvados]
+host = localhost:3030
+# This is the token for the text fixture's admin user.
+token = 4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h
+insecure = yes
+timeout = 15
+
+[Cloud]
+provider = dummy
+shutdown_windows = 1, 1
+timeout = 15
+
+[Cloud Credentials]
+creds = dummycreds
+
+[Cloud List]
+[Cloud Create]
+
+[Size Medium]
+cores = 4
+scratch = 1234
diff --git a/services/nodemanager/setup.py b/services/nodemanager/setup.py
new file mode 100644
index 0000000..fabb883
--- /dev/null
+++ b/services/nodemanager/setup.py
@@ -0,0 +1,41 @@
+#!/usr/bin/env python
+
+import os
+import subprocess
+import time
+
+from setuptools import setup, find_packages
+
+SETUP_DIR = os.path.dirname(__file__) or "."
+cmd_opts = {'egg_info': {}}
+try:
+    git_tags = subprocess.check_output(
+        ['git', 'log', '--first-parent', '--max-count=1',
+         '--format=format:%ct %h', SETUP_DIR]).split()
+    assert len(git_tags) == 2
+except (AssertionError, OSError, subprocess.CalledProcessError):
+    pass
+else:
+    git_tags[0] = time.strftime('%Y%m%d%H%M%S', time.gmtime(int(git_tags[0])))
+    cmd_opts['egg_info']['tag_build'] = '.{}.{}'.format(*git_tags)
+
+setup(name='arvados-node-manager',
+      version='0.1',
+      description='Arvados compute node manager',
+      author='Arvados',
+      author_email='info at arvados.org',
+      url="https://arvados.org",
+      license='GNU Affero General Public License, version 3.0',
+      packages=find_packages(),
+      install_requires=[
+        'apache-libcloud',
+        'arvados-python-client',
+        'pykka',
+        'python-daemon',
+        ],
+      scripts=['bin/arvados-node-manager'],
+      test_suite='tests',
+      tests_require=['mock>=1.0'],
+      zip_safe=False,
+      options=cmd_opts,
+      )
diff --git a/services/nodemanager/tests/__init__.py b/services/nodemanager/tests/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/services/nodemanager/tests/test_clientactor.py b/services/nodemanager/tests/test_clientactor.py
new file mode 100644
index 0000000..d09d342
--- /dev/null
+++ b/services/nodemanager/tests/test_clientactor.py
@@ -0,0 +1,125 @@
+#!/usr/bin/env python
+
+import unittest
+
+import mock
+import pykka
+
+import arvnodeman.clientactor as clientactor
+from . import testutil
+
+class RemotePollLoopActorTestCase(testutil.RemotePollLoopActorTestMixin,
+                                  unittest.TestCase):
+    class MockClientError(Exception):
+        pass
+
+    class TestActor(clientactor.RemotePollLoopActor):
+        LOGGER_NAME = 'arvnodeman.testpoll'
+
+        def _send_request(self):
+            return self._client()
+    TestActor.CLIENT_ERRORS = (MockClientError,)
+    TEST_CLASS = TestActor
+
+
+    def build_monitor(self, side_effect, *args, **kwargs):
+        super(RemotePollLoopActorTestCase, self).build_monitor(*args, **kwargs)
+        self.client.side_effect = side_effect
+
+    def test_poll_loop_starts_after_subscription(self):
+        self.build_monitor(['test1'])
+        self.monitor.subscribe(self.subscriber)
+        self.wait_for_call(self.subscriber)
+        self.subscriber.assert_called_with('test1')
+        self.wait_for_call(self.timer.schedule)
+
+    def test_poll_loop_continues_after_failure(self):
+        self.build_monitor(self.MockClientError)
+        self.monitor.subscribe(self.subscriber)
+        self.wait_for_call(self.timer.schedule)
+        self.assertTrue(self.monitor.actor_ref.is_alive(),
+                        "poll loop died after error")
+        self.assertFalse(self.subscriber.called,
+                         "poll loop notified subscribers after error")
+
+    def test_late_subscribers_get_responses(self):
+        self.build_monitor(['late_test'])
+        self.monitor.subscribe(lambda response: None)
+        self.monitor.subscribe(self.subscriber)
+        self.monitor.poll()
+        self.wait_for_call(self.subscriber)
+        self.subscriber.assert_called_with('late_test')
+
+    def test_survive_dead_subscriptions(self):
+        self.build_monitor(['survive1', 'survive2'])
+        dead_subscriber = mock.Mock(name='dead_subscriber')
+        dead_subscriber.side_effect = pykka.ActorDeadError
+        self.monitor.subscribe(dead_subscriber)
+        self.wait_for_call(dead_subscriber)
+        self.monitor.subscribe(self.subscriber)
+        self.monitor.poll()
+        self.wait_for_call(self.subscriber)
+        self.subscriber.assert_called_with('survive2')
+        self.assertTrue(self.monitor.actor_ref.is_alive(),
+                        "poll loop died from dead subscriber")
+
+    def test_no_subscriptions_by_key_without_support(self):
+        self.build_monitor([])
+        with self.assertRaises(AttributeError):
+            self.monitor.subscribe_to('key')
+
+
+class RemotePollLoopActorWithKeysTestCase(testutil.RemotePollLoopActorTestMixin,
+                                          unittest.TestCase):
+    class TestActor(RemotePollLoopActorTestCase.TestActor):
+        def _item_key(self, item):
+            return item['key']
+    TEST_CLASS = TestActor
+
+
+    def build_monitor(self, side_effect, *args, **kwargs):
+        super(RemotePollLoopActorWithKeysTestCase, self).build_monitor(
+            *args, **kwargs)
+        self.client.side_effect = side_effect
+
+    def test_key_subscription(self):
+        self.build_monitor([[{'key': 1}, {'key': 2}]])
+        self.monitor.subscribe_to(2, self.subscriber)
+        self.wait_for_call(self.subscriber)
+        self.subscriber.assert_called_with({'key': 2})
+
+    def test_survive_dead_key_subscriptions(self):
+        item = {'key': 3}
+        self.build_monitor([[item], [item]])
+        dead_subscriber = mock.Mock(name='dead_subscriber')
+        dead_subscriber.side_effect = pykka.ActorDeadError
+        self.monitor.subscribe_to(3, dead_subscriber)
+        self.wait_for_call(dead_subscriber)
+        self.monitor.subscribe_to(3, self.subscriber)
+        self.monitor.poll()
+        self.wait_for_call(self.subscriber)
+        self.subscriber.assert_called_with(item)
+        self.assertTrue(self.monitor.actor_ref.is_alive(),
+                        "poll loop died from dead key subscriber")
+
+    def test_mixed_subscriptions(self):
+        item = {'key': 4}
+        self.build_monitor([[item], [item]])
+        key_subscriber = mock.Mock(name='key_subscriber')
+        self.monitor.subscribe(self.subscriber)
+        self.monitor.subscribe_to(4, key_subscriber)
+        self.monitor.poll()
+        self.wait_for_call(self.subscriber)
+        self.subscriber.assert_called_with([item])
+        key_subscriber.assert_called_with(item)
+
+    def test_subscription_to_missing_key(self):
+        self.build_monitor([[]])
+        self.monitor.subscribe_to('nonesuch', self.subscriber)
+        self.wait_for_call(self.subscriber)
+        self.subscriber.assert_called_with(None)
+
+
+if __name__ == '__main__':
+    unittest.main()
+
diff --git a/services/nodemanager/tests/test_computenode.py b/services/nodemanager/tests/test_computenode.py
new file mode 100644
index 0000000..e628b13
--- /dev/null
+++ b/services/nodemanager/tests/test_computenode.py
@@ -0,0 +1,255 @@
+#!/usr/bin/env python
+
+import time
+import unittest
+
+import arvados.errors as arverror
+import httplib2
+import mock
+import pykka
+
+import arvnodeman.computenode as cnode
+from . import testutil
+
+class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
+    def make_mocks(self, arvados_effect=None, cloud_effect=None):
+        if arvados_effect is None:
+            arvados_effect = testutil.arvados_node_mock()
+        self.timer = testutil.MockTimer()
+        self.api_client = mock.MagicMock(name='api_client')
+        self.api_client.nodes().create().execute.side_effect = arvados_effect
+        self.api_client.nodes().update().execute.side_effect = arvados_effect
+        self.cloud_client = mock.MagicMock(name='cloud_client')
+        self.cloud_client.create_node.return_value = testutil.cloud_node_mock(1)
+
+    def make_actor(self, arv_node=None):
+        if not hasattr(self, 'timer'):
+            self.make_mocks()
+        self.setup_actor = cnode.ComputeNodeSetupActor.start(
+            self.timer, self.api_client, self.cloud_client,
+            testutil.MockSize(1), arv_node).proxy()
+
+    def test_creation_without_arvados_node(self):
+        self.make_actor()
+        self.wait_for_call(self.api_client.nodes().create().execute)
+        self.wait_for_call(self.cloud_client.create_node)
+        self.wait_for_call(self.cloud_client.post_create_node)
+
+    def test_creation_with_arvados_node(self):
+        arv_node = testutil.arvados_node_mock()
+        self.make_mocks([arv_node])
+        self.make_actor(arv_node)
+        self.wait_for_call(self.api_client.nodes().update().execute)
+        self.wait_for_call(self.cloud_client.create_node)
+        self.wait_for_call(self.cloud_client.post_create_node)
+        self.cloud_client.post_create_node.assert_called_with(
+            self.cloud_client.create_node(), arv_node)
+
+    def test_failed_calls_retried(self):
+        self.make_mocks([
+                arverror.ApiError(httplib2.Response({'status': '500'}), ""),
+                testutil.arvados_node_mock(),
+                ])
+        self.make_actor()
+        self.wait_for_call(self.cloud_client.create_node)
+
+    def test_stop_when_no_cloud_node(self):
+        self.make_mocks(
+            arverror.ApiError(httplib2.Response({'status': '500'}), ""))
+        self.make_actor()
+        self.wait_for_call(self.api_client.nodes().create().execute)
+        self.setup_actor.stop_if_no_cloud_node()
+        self.assertTrue(
+            self.setup_actor.actor_ref.actor_stopped.wait(self.TIMEOUT))
+
+    def test_no_stop_when_cloud_node(self):
+        self.make_actor()
+        self.wait_for_call(self.cloud_client.create_node)
+        self.setup_actor.stop_if_no_cloud_node().get(self.TIMEOUT)
+        self.assertFalse(self.setup_actor.actor_ref.actor_stopped.is_set())
+
+    def test_subscribe(self):
+        self.make_mocks(
+            arverror.ApiError(httplib2.Response({'status': '500'}), ""))
+        self.make_actor()
+        subscriber = mock.Mock(name='subscriber_mock')
+        self.setup_actor.subscribe(subscriber)
+        self.api_client.nodes().create().execute.side_effect = [
+            testutil.arvados_node_mock()]
+        self.wait_for_call(subscriber)
+        self.assertEqual(self.setup_actor.actor_ref.actor_urn,
+                         subscriber.call_args[0][0].actor_ref.actor_urn)
+
+    def test_late_subscribe(self):
+        self.make_actor()
+        subscriber = mock.Mock(name='subscriber_mock')
+        self.wait_for_call(self.cloud_client.create_node)
+        self.setup_actor.subscribe(subscriber)
+        self.wait_for_call(subscriber)
+        self.assertEqual(self.setup_actor.actor_ref.actor_urn,
+                         subscriber.call_args[0][0].actor_ref.actor_urn)
+
+
+class ComputeNodeShutdownActorTestCase(testutil.ActorTestMixin,
+                                       unittest.TestCase):
+    def make_mocks(self, cloud_node=None):
+        self.timer = testutil.MockTimer()
+        self.cloud_client = mock.MagicMock(name='cloud_client')
+        if cloud_node is None:
+            cloud_node = testutil.cloud_node_mock()
+        self.cloud_node = cloud_node
+
+    def make_actor(self, arv_node=None):
+        if not hasattr(self, 'timer'):
+            self.make_mocks()
+        self.shutdown_actor = cnode.ComputeNodeShutdownActor.start(
+            self.timer, self.cloud_client, self.cloud_node).proxy()
+
+    def test_easy_shutdown(self):
+        self.make_actor()
+        self.wait_for_call(self.cloud_client.destroy_node)
+
+
+ at mock.patch('time.time', return_value=1)
+class ShutdownTimerTestCase(unittest.TestCase):
+    def test_two_length_window(self, time_mock):
+        timer = cnode.ShutdownTimer(time_mock.return_value, [8, 2])
+        self.assertEqual(481, timer.next_opening())
+        self.assertFalse(timer.window_open())
+        time_mock.return_value += 500
+        self.assertEqual(1081, timer.next_opening())
+        self.assertTrue(timer.window_open())
+        time_mock.return_value += 200
+        self.assertEqual(1081, timer.next_opening())
+        self.assertFalse(timer.window_open())
+
+    def test_three_length_window(self, time_mock):
+        timer = cnode.ShutdownTimer(time_mock.return_value, [6, 3, 1])
+        self.assertEqual(361, timer.next_opening())
+        self.assertFalse(timer.window_open())
+        time_mock.return_value += 400
+        self.assertEqual(961, timer.next_opening())
+        self.assertTrue(timer.window_open())
+        time_mock.return_value += 200
+        self.assertEqual(961, timer.next_opening())
+        self.assertFalse(timer.window_open())
+
+
+class ComputeNodeActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
+    class MockShutdownTimer(object):
+        def _set_state(self, is_open, next_opening):
+            self.window_open = lambda: is_open
+            self.next_opening = lambda: next_opening
+
+
+    def make_mocks(self, node_num):
+        self.shutdowns = self.MockShutdownTimer()
+        self.shutdowns._set_state(False, 300)
+        self.timer = mock.MagicMock(name='timer_mock')
+        self.cloud_mock = testutil.cloud_node_mock(node_num)
+        self.subscriber = mock.Mock(name='subscriber_mock')
+
+    def make_actor(self, node_num=1, arv_node=None, start_time=None):
+        if not hasattr(self, 'cloud_mock'):
+            self.make_mocks(node_num)
+        if start_time is None:
+            start_time = time.time()
+        start_time = time.time()
+        self.node_actor = cnode.ComputeNodeActor.start(
+            self.cloud_mock, start_time, self.shutdowns, self.timer,
+            arv_node).proxy()
+        self.node_actor.subscribe(self.subscriber)
+
+    def test_init_shutdown_scheduling(self):
+        self.make_actor()
+        self.wait_for_call(self.timer.schedule)
+        self.assertEqual(300, self.timer.schedule.call_args[0][0])
+
+    def test_shutdown_subscription(self):
+        self.make_actor()
+        self.shutdowns._set_state(True, 600)
+        self.node_actor.consider_shutdown()
+        self.wait_for_call(self.subscriber)
+        self.assertEqual(self.node_actor.actor_ref.actor_urn,
+                         self.subscriber.call_args[0][0].actor_ref.actor_urn)
+
+    def test_shutdown_without_arvados_node(self):
+        self.make_actor()
+        self.shutdowns._set_state(True, 600)
+        self.node_actor.consider_shutdown()
+        self.wait_for_call(self.subscriber)
+
+    def test_no_shutdown_without_arvados_node_and_old_cloud_node(self):
+        self.make_actor(start_time=0)
+        self.shutdowns._set_state(True, 600)
+        self.node_actor.consider_shutdown()
+        self.assertFalse(self.subscriber.called)
+
+    def check_shutdown_rescheduled(self, window_open, next_window,
+                                   schedule_time=None):
+        self.shutdowns._set_state(window_open, next_window)
+        self.timer.schedule.reset_mock()
+        self.node_actor.consider_shutdown()
+        self.wait_for_call(self.timer.schedule)
+        if schedule_time is not None:
+            self.assertEqual(schedule_time, self.timer.schedule.call_args[0][0])
+        self.assertFalse(self.subscriber.called)
+
+    def test_shutdown_window_close_scheduling(self):
+        self.make_actor()
+        self.check_shutdown_rescheduled(False, 600, 600)
+
+    def test_no_shutdown_when_node_running_job(self):
+        self.make_actor(4, testutil.arvados_node_mock(4, job_uuid=True))
+        self.check_shutdown_rescheduled(True, 600)
+
+    def test_no_shutdown_when_node_state_unknown(self):
+        self.make_actor(5, testutil.arvados_node_mock(5, info={}))
+        self.check_shutdown_rescheduled(True, 600)
+
+    def test_no_shutdown_when_node_state_stale(self):
+        self.make_actor(6, testutil.arvados_node_mock(6, age=900))
+        self.check_shutdown_rescheduled(True, 600)
+
+    def test_arvados_node_match(self):
+        self.make_actor(2)
+        arv_node = testutil.arvados_node_mock(2)
+        pair_future = self.node_actor.offer_arvados_pair(arv_node)
+        self.assertEqual(self.cloud_mock.id, pair_future.get(self.TIMEOUT))
+
+    def test_arvados_node_mismatch(self):
+        self.make_actor(3)
+        arv_node = testutil.arvados_node_mock(1)
+        pair_future = self.node_actor.offer_arvados_pair(arv_node)
+        self.assertIsNone(pair_future.get(self.TIMEOUT))
+
+    def test_update_cloud_node(self):
+        self.make_actor(1)
+        self.make_mocks(2)
+        self.cloud_mock.id = '1'
+        self.node_actor.update_cloud_node(self.cloud_mock)
+        current_cloud = self.node_actor.cloud_node.get(self.TIMEOUT)
+        self.assertEqual([testutil.ip_address_mock(2)],
+                         current_cloud.private_ips)
+
+    def test_missing_cloud_node_update(self):
+        self.make_actor(1)
+        self.node_actor.update_cloud_node(None)
+        current_cloud = self.node_actor.cloud_node.get(self.TIMEOUT)
+        self.assertEqual([testutil.ip_address_mock(1)],
+                         current_cloud.private_ips)
+
+    def test_update_arvados_node(self):
+        self.make_actor(3)
+        job_uuid = 'zzzzz-jjjjj-updatejobnode00'
+        new_arvados = testutil.arvados_node_mock(3, job_uuid)
+        self.node_actor.update_arvados_node(new_arvados)
+        current_arvados = self.node_actor.arvados_node.get(self.TIMEOUT)
+        self.assertEqual(job_uuid, current_arvados['job_uuid'])
+
+    def test_missing_arvados_node_update(self):
+        self.make_actor(4, testutil.arvados_node_mock(4))
+        self.node_actor.update_arvados_node(None)
+        current_arvados = self.node_actor.arvados_node.get(self.TIMEOUT)
+        self.assertEqual(testutil.ip_address_mock(4),
+                         current_arvados['ip_address'])
diff --git a/services/nodemanager/tests/test_computenode_ec2.py b/services/nodemanager/tests/test_computenode_ec2.py
new file mode 100644
index 0000000..20f1d1d
--- /dev/null
+++ b/services/nodemanager/tests/test_computenode_ec2.py
@@ -0,0 +1,70 @@
+#!/usr/bin/env python
+
+import time
+import unittest
+
+import mock
+
+import arvnodeman.computenode.ec2 as ec2
+from . import testutil
+
+class EC2ComputeNodeDriverTestCase(unittest.TestCase):
+    def setUp(self):
+        self.driver_mock = mock.MagicMock(name='driver_mock')
+
+    def new_driver(self, auth_kwargs={}, list_kwargs={}, create_kwargs={}):
+        create_kwargs.setdefault('ping_host', '100::')
+        return ec2.ComputeNodeDriver(
+            auth_kwargs, list_kwargs, create_kwargs,
+            driver_class=self.driver_mock)
+
+    def test_driver_instantiation(self):
+        kwargs = {'key': 'testkey'}
+        driver = self.new_driver(auth_kwargs=kwargs)
+        self.assertTrue(self.driver_mock.called)
+        self.assertEqual(kwargs, self.driver_mock.call_args[1])
+
+    def test_list_kwargs_become_filters(self):
+        # We're also testing tag name translation.
+        driver = self.new_driver(list_kwargs={'tag_test': 'true'})
+        driver.list_nodes()
+        list_method = self.driver_mock().list_nodes
+        self.assertTrue(list_method.called)
+        self.assertEqual({'tag:test': 'true'},
+                          list_method.call_args[1].get('ex_filters'))
+
+    def test_create_location_loaded_at_initialization(self):
+        kwargs = {'location': 'testregion'}
+        driver = self.new_driver(create_kwargs=kwargs)
+        self.assertTrue(self.driver_mock().list_locations)
+
+    def test_create_image_loaded_at_initialization(self):
+        kwargs = {'image': 'testimage'}
+        driver = self.new_driver(create_kwargs=kwargs)
+        self.assertTrue(self.driver_mock().list_images)
+
+    def test_create_includes_ping_secret(self):
+        arv_node = testutil.arvados_node_mock(info={'ping_secret': 'ssshh'})
+        driver = self.new_driver()
+        driver.create_node(testutil.MockSize(1), arv_node)
+        create_method = self.driver_mock().create_node
+        self.assertTrue(create_method.called)
+        self.assertIn('ping_secret=ssshh',
+                      create_method.call_args[1].get('ex_userdata',
+                                                     'arg missing'))
+
+    def test_post_create_tags_node(self):
+        arv_node = testutil.arvados_node_mock(8)
+        cloud_node = testutil.cloud_node_mock(8)
+        driver = self.new_driver(list_kwargs={'tag:list': 'test'})
+        self.assertEqual({'ex_metadata': {'list': 'test'},
+                          'name': 'compute8.zzzzz.arvadosapi.com'},
+                         driver.arvados_create_kwargs(arv_node))
+
+    def test_node_create_time(self):
+        refsecs = int(time.time())
+        reftuple = time.gmtime(refsecs)
+        node = testutil.cloud_node_mock()
+        node.extra = {'launch_time': time.strftime('%Y-%m-%dT%H:%M:%S.000Z',
+                                                   reftuple)}
+        self.assertEqual(refsecs, ec2.ComputeNodeDriver.node_start_time(node))
diff --git a/services/nodemanager/tests/test_config.py b/services/nodemanager/tests/test_config.py
new file mode 100644
index 0000000..f726e6e
--- /dev/null
+++ b/services/nodemanager/tests/test_config.py
@@ -0,0 +1,63 @@
+#!/usr/bin/env python
+
+import io
+import logging
+import unittest
+
+import arvnodeman.config as nmconfig
+
+class NodeManagerConfigTestCase(unittest.TestCase):
+    TEST_CONFIG = u"""
+[Cloud]
+provider = dummy
+shutdown_windows = 52, 6, 2
+
+[Cloud Credentials]
+creds = dummy_creds
+
+[Cloud List]
+[Cloud Create]
+
+[Size 1]
+cores = 1
+
+[Logging]
+file = /dev/null
+level = DEBUG
+testlogger = INFO
+"""
+
+    def load_config(self, config=None, config_str=None):
+        if config is None:
+            config = nmconfig.NodeManagerConfig()
+        if config_str is None:
+            config_str = self.TEST_CONFIG
+        with io.StringIO(config_str) as config_fp:
+            config.readfp(config_fp)
+        return config
+
+    def test_seeded_defaults(self):
+        config = nmconfig.NodeManagerConfig()
+        sec_names = set(config.sections())
+        self.assertIn('Arvados', sec_names)
+        self.assertIn('Daemon', sec_names)
+        self.assertFalse(any(name.startswith('Size ') for name in sec_names))
+
+    def test_list_sizes(self):
+        config = self.load_config()
+        client = config.new_cloud_client()
+        sizes = config.node_sizes(client.list_sizes())
+        self.assertEqual(1, len(sizes))
+        size, kwargs = sizes[0]
+        self.assertEqual('Small', size.name)
+        self.assertEqual(1, kwargs['cores'])
+
+    def test_shutdown_windows(self):
+        config = self.load_config()
+        self.assertEqual([52, 6, 2], config.shutdown_windows())
+
+    def test_log_levels(self):
+        config = self.load_config()
+        self.assertEqual({'level': logging.DEBUG,
+                          'testlogger': logging.INFO},
+                         config.log_levels())
diff --git a/services/nodemanager/tests/test_daemon.py b/services/nodemanager/tests/test_daemon.py
new file mode 100644
index 0000000..28be5fa
--- /dev/null
+++ b/services/nodemanager/tests/test_daemon.py
@@ -0,0 +1,138 @@
+#!/usr/bin/env python
+
+import time
+import unittest
+
+import mock
+
+import arvnodeman.daemon as nmdaemon
+from . import testutil
+
+class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
+                                     unittest.TestCase):
+    def make_daemon(self, cloud_nodes=[], arvados_nodes=[], want_sizes=[]):
+        for name in ['cloud_nodes', 'arvados_nodes', 'server_wishlist']:
+            setattr(self, name + '_poller', mock.MagicMock(name=name + '_mock'))
+        self.arv_factory = mock.MagicMock(name='arvados_mock')
+        self.cloud_factory = mock.MagicMock(name='cloud_mock')
+        self.cloud_factory().node_start_time.return_value = time.time()
+        self.timer = testutil.MockTimer()
+        self.node_factory = mock.MagicMock(name='factory_mock')
+        self.node_setup = mock.MagicMock(name='setup_mock')
+        self.node_shutdown = mock.MagicMock(name='shutdown_mock')
+        self.daemon = nmdaemon.NodeManagerDaemonActor.start(
+            self.server_wishlist_poller, self.arvados_nodes_poller,
+            self.cloud_nodes_poller, self.timer,
+            self.arv_factory, self.cloud_factory,
+            [54, 5, 1], 8, 600, 3600,
+            self.node_setup, self.node_shutdown, self.node_factory).proxy()
+        if cloud_nodes is not None:
+            self.daemon.update_cloud_nodes(cloud_nodes)
+        if arvados_nodes is not None:
+            self.daemon.update_arvados_nodes(arvados_nodes)
+        if want_sizes is not None:
+            self.daemon.update_server_wishlist(want_sizes)
+
+    def test_easy_node_creation(self):
+        size = testutil.MockSize(1)
+        self.make_daemon(want_sizes=[size])
+        self.wait_for_call(self.node_setup.start)
+
+    def test_node_pairing(self):
+        cloud_node = testutil.cloud_node_mock(1)
+        arv_node = testutil.arvados_node_mock(1)
+        self.make_daemon([cloud_node], [arv_node])
+        self.wait_for_call(self.node_factory.start)
+        pair_func = self.node_factory.start().proxy().offer_arvados_pair
+        self.wait_for_call(pair_func)
+        pair_func.assert_called_with(arv_node)
+
+    def test_node_pairing_after_arvados_update(self):
+        cloud_node = testutil.cloud_node_mock(2)
+        arv_node = testutil.arvados_node_mock(2, ip_address=None)
+        self.make_daemon([cloud_node], None)
+        pair_func = self.node_factory.start().proxy().offer_arvados_pair
+        pair_func().get.return_value = None
+        self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
+        pair_func.assert_called_with(arv_node)
+
+        pair_func().get.return_value = cloud_node.id
+        pair_func.reset_mock()
+        arv_node = testutil.arvados_node_mock(2)
+        self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
+        pair_func.assert_called_with(arv_node)
+
+    def test_old_arvados_node_not_double_assigned(self):
+        arv_node = testutil.arvados_node_mock(3, age=9000)
+        size = testutil.MockSize(3)
+        self.make_daemon(arvados_nodes=[arv_node], want_sizes=[size, size])
+        node_starter = self.node_setup.start
+        deadline = time.time() + self.TIMEOUT
+        while (time.time() < deadline) and (node_starter.call_count < 2):
+            time.sleep(.1)
+        self.assertEqual(2, node_starter.call_count)
+        used_nodes = [call[1].get('arvados_node')
+                      for call in node_starter.call_args_list]
+        self.assertIn(arv_node, used_nodes)
+        self.assertIn(None, used_nodes)
+
+    def test_node_count_satisfied(self):
+        self.make_daemon([testutil.cloud_node_mock()])
+        self.daemon.update_server_wishlist(
+            [testutil.MockSize(1)]).get(self.TIMEOUT)
+        self.assertFalse(self.node_setup.called)
+
+    def test_booting_nodes_counted(self):
+        cloud_node = testutil.cloud_node_mock(1)
+        arv_node = testutil.arvados_node_mock(1)
+        server_wishlist = [testutil.MockSize(1)] * 2
+        self.make_daemon([cloud_node], [arv_node], server_wishlist)
+        self.wait_for_call(self.node_setup.start)
+        self.node_setup.reset_mock()
+        self.daemon.update_server_wishlist(server_wishlist).get(self.TIMEOUT)
+        self.assertFalse(self.node_setup.called)
+
+    def test_booting_nodes_shut_down(self):
+        self.make_daemon(want_sizes=[testutil.MockSize(1)])
+        self.wait_for_call(self.node_setup.start)
+        self.daemon.update_server_wishlist([])
+        self.wait_for_call(
+            self.node_setup.start().proxy().stop_if_no_cloud_node)
+
+    def test_shutdown_declined_at_wishlist_capacity(self):
+        cloud_node = testutil.cloud_node_mock(1)
+        size = testutil.MockSize(1)
+        self.make_daemon(cloud_nodes=[cloud_node], want_sizes=[size])
+        node_actor = self.node_factory().proxy()
+        self.daemon.shutdown_offer(node_actor).get(self.TIMEOUT)
+        self.assertFalse(node_actor.shutdown.called)
+
+    def test_shutdown_accepted_below_capacity(self):
+        self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
+        node_actor = self.node_factory().proxy()
+        self.daemon.shutdown_offer(node_actor)
+        self.wait_for_call(self.node_shutdown.start)
+
+    def test_clean_shutdown_waits_for_node_setup_finish(self):
+        self.make_daemon(want_sizes=[testutil.MockSize(1)])
+        self.wait_for_call(self.node_setup.start)
+        new_node = self.node_setup.start().proxy()
+        self.daemon.shutdown()
+        self.wait_for_call(new_node.stop_if_no_cloud_node)
+        self.daemon.node_up(new_node)
+        self.wait_for_call(new_node.stop)
+        self.assertTrue(
+            self.daemon.actor_ref.actor_stopped.wait(self.TIMEOUT))
+
+    def test_wishlist_ignored_after_shutdown(self):
+        size = testutil.MockSize(2)
+        self.make_daemon(want_sizes=[size])
+        node_starter = self.node_setup.start
+        self.wait_for_call(node_starter)
+        node_starter.reset_mock()
+        self.daemon.shutdown()
+        self.daemon.update_server_wishlist([size] * 2).get(self.TIMEOUT)
+        # Send another message and wait for a response, to make sure all
+        # internal messages generated by the wishlist update are processed.
+        self.daemon.update_server_wishlist([size] * 2).get(self.TIMEOUT)
+        self.assertFalse(node_starter.called)
diff --git a/services/nodemanager/tests/test_jobqueue.py b/services/nodemanager/tests/test_jobqueue.py
new file mode 100644
index 0000000..0ff9ebb
--- /dev/null
+++ b/services/nodemanager/tests/test_jobqueue.py
@@ -0,0 +1,72 @@
+#!/usr/bin/env python
+
+import unittest
+
+import arvnodeman.jobqueue as jobqueue
+from . import testutil
+
+class ServerCalculatorTestCase(unittest.TestCase):
+    def make_calculator(self, factors, **kwargs):
+        return jobqueue.ServerCalculator(
+            [(testutil.MockSize(n), {'cores': n}) for n in factors], **kwargs)
+
+    def calculate(self, servcalc, *constraints):
+        return servcalc.servers_for_queue(
+            [{'runtime_constraints': cdict} for cdict in constraints])
+
+    def test_empty_queue_needs_no_servers(self):
+        servcalc = self.make_calculator([1])
+        self.assertEqual([], servcalc.servers_for_queue([]))
+
+    def test_easy_server_count(self):
+        servcalc = self.make_calculator([1])
+        servlist = self.calculate(servcalc, {'min_nodes': 3})
+        self.assertEqual(3, len(servlist))
+
+    def test_implicit_server_count(self):
+        servcalc = self.make_calculator([1])
+        servlist = self.calculate(servcalc, {}, {'min_nodes': 3})
+        self.assertEqual(4, len(servlist))
+
+    def test_bad_min_nodes_override(self):
+        servcalc = self.make_calculator([1])
+        servlist = self.calculate(servcalc,
+                                  {'min_nodes': -2}, {'min_nodes': 'foo'})
+        self.assertEqual(2, len(servlist))
+
+    def test_ignore_unsatisfiable_jobs(self):
+        servcalc = self.make_calculator([1], max_nodes=9)
+        servlist = self.calculate(servcalc,
+                                  {'min_cores_per_node': 2},
+                                  {'min_ram_mb_per_node': 256},
+                                  {'min_nodes': 6},
+                                  {'min_nodes': 12},
+                                  {'min_scratch_mb_per_node': 200})
+        self.assertEqual(6, len(servlist))
+
+
+class JobQueueMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
+                                   unittest.TestCase):
+    TEST_CLASS = jobqueue.JobQueueMonitorActor
+
+    class MockCalculator(object):
+        @staticmethod
+        def servers_for_queue(queue):
+            return [testutil.MockSize(n) for n in queue]
+
+
+    def build_monitor(self, side_effect, *args, **kwargs):
+        super(JobQueueMonitorActorTestCase, self).build_monitor(*args, **kwargs)
+        self.client.jobs().queue().execute.side_effect = side_effect
+
+    def test_subscribers_get_server_lists(self):
+        self.build_monitor([{'items': [1, 2]}], self.MockCalculator())
+        self.monitor.subscribe(self.subscriber)
+        self.wait_for_call(self.subscriber)
+        self.subscriber.assert_called_with([testutil.MockSize(1),
+                                            testutil.MockSize(2)])
+
+
+if __name__ == '__main__':
+    unittest.main()
+
diff --git a/services/nodemanager/tests/test_nodelist.py b/services/nodemanager/tests/test_nodelist.py
new file mode 100644
index 0000000..f4a3321
--- /dev/null
+++ b/services/nodemanager/tests/test_nodelist.py
@@ -0,0 +1,54 @@
+#!/usr/bin/env python
+
+import unittest
+
+import arvnodeman.nodelist as nodelist
+from . import testutil
+
+class ArvadosNodeListMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
+                                          unittest.TestCase):
+    TEST_CLASS = nodelist.ArvadosNodeListMonitorActor
+
+    def build_monitor(self, side_effect, *args, **kwargs):
+        super(ArvadosNodeListMonitorActorTestCase, self).build_monitor(
+            *args, **kwargs)
+        self.client.nodes().list().execute.side_effect = side_effect
+
+    def test_uuid_is_subscription_key(self):
+        node = testutil.arvados_node_mock()
+        self.build_monitor([{'items': [node]}])
+        self.monitor.subscribe_to(node['uuid'], self.subscriber)
+        self.wait_for_call(self.subscriber)
+        self.subscriber.assert_called_with(node)
+
+
+class CloudNodeListMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
+                                        unittest.TestCase):
+    TEST_CLASS = nodelist.CloudNodeListMonitorActor
+
+    class MockNode(object):
+        def __init__(self, count):
+            self.id = str(count)
+            self.name = 'test{}.example.com'.format(count)
+            self.private_ips = ['10.0.0.{}'.format(count)]
+            self.public_ips = []
+            self.size = None
+            self.state = 0
+
+
+    def build_monitor(self, side_effect, *args, **kwargs):
+        super(CloudNodeListMonitorActorTestCase, self).build_monitor(
+            *args, **kwargs)
+        self.client.list_nodes.side_effect = side_effect
+
+    def test_id_is_subscription_key(self):
+        node = self.MockNode(1)
+        self.build_monitor([[node]])
+        self.monitor.subscribe_to('1', self.subscriber)
+        self.wait_for_call(self.subscriber)
+        self.subscriber.assert_called_with(node)
+
+
+if __name__ == '__main__':
+    unittest.main()
+
diff --git a/services/nodemanager/tests/test_timedcallback.py b/services/nodemanager/tests/test_timedcallback.py
new file mode 100644
index 0000000..7c4c43b
--- /dev/null
+++ b/services/nodemanager/tests/test_timedcallback.py
@@ -0,0 +1,53 @@
+#!/usr/bin/env python
+
+import time
+import unittest
+
+import mock
+import pykka
+
+import arvnodeman.timedcallback as timedcallback
+from . import testutil
+
+ at testutil.no_sleep
+class TimedCallBackActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
+    def test_immediate_turnaround(self):
+        future = self.FUTURE_CLASS()
+        deliverer = timedcallback.TimedCallBackActor.start().proxy()
+        deliverer.schedule(time.time() - 1, future.set, 'immediate')
+        self.assertEqual('immediate', future.get(self.TIMEOUT))
+
+    def test_delayed_turnaround(self):
+        future = self.FUTURE_CLASS()
+        with mock.patch('time.time', return_value=0) as mock_now:
+            deliverer = timedcallback.TimedCallBackActor.start().proxy()
+            deliverer.schedule(1, future.set, 'delayed')
+            self.assertRaises(pykka.Timeout, future.get, .5)
+            mock_now.return_value = 2
+            self.assertEqual('delayed', future.get(self.TIMEOUT))
+
+    def test_out_of_order_scheduling(self):
+        future1 = self.FUTURE_CLASS()
+        future2 = self.FUTURE_CLASS()
+        with mock.patch('time.time', return_value=1.5) as mock_now:
+            deliverer = timedcallback.TimedCallBackActor.start().proxy()
+            deliverer.schedule(2, future2.set, 'second')
+            deliverer.schedule(1, future1.set, 'first')
+            self.assertEqual('first', future1.get(self.TIMEOUT))
+            self.assertRaises(pykka.Timeout, future2.get, .1)
+            mock_now.return_value = 3
+            self.assertEqual('second', future2.get(self.TIMEOUT))
+
+    def test_dead_actors_ignored(self):
+        receiver = mock.Mock(name='dead_actor', spec=pykka.ActorRef)
+        receiver.tell.side_effect = pykka.ActorDeadError
+        deliverer = timedcallback.TimedCallBackActor.start().proxy()
+        deliverer.schedule(time.time() - 1, receiver.tell, 'error')
+        self.wait_for_call(receiver.tell)
+        receiver.tell.assert_called_with('error')
+        self.assertTrue(deliverer.actor_ref.is_alive(), "deliverer died")
+
+
+if __name__ == '__main__':
+    unittest.main()
+
diff --git a/services/nodemanager/tests/testutil.py b/services/nodemanager/tests/testutil.py
new file mode 100644
index 0000000..d67cf1c
--- /dev/null
+++ b/services/nodemanager/tests/testutil.py
@@ -0,0 +1,79 @@
+#!/usr/bin/env python
+
+import time
+
+import mock
+import pykka
+
+no_sleep = mock.patch('time.sleep', lambda n: None)
+
+def arvados_node_mock(node_num=99, job_uuid=None, age=0, **kwargs):
+    if job_uuid is True:
+        job_uuid = 'zzzzz-jjjjj-jobjobjobjobjob'
+    slurm_state = 'idle' if (job_uuid is None) else 'alloc'
+    node = {'uuid': 'zzzzz-yyyyy-12345abcde67890',
+            'created_at': '2014-01-01T01:02:03Z',
+            'modified_at': time.strftime('%Y-%m-%dT%H:%M:%SZ',
+                                         time.gmtime(time.time() - age)),
+            'hostname': 'compute{}'.format(node_num),
+            'domain': 'zzzzz.arvadosapi.com',
+            'ip_address': ip_address_mock(node_num),
+            'job_uuid': job_uuid,
+            'info': {'slurm_state': slurm_state}}
+    node.update(kwargs)
+    return node
+
+def cloud_node_mock(node_num=99):
+    node = mock.NonCallableMagicMock(
+        ['id', 'name', 'state', 'public_ips', 'private_ips', 'driver', 'size',
+         'image', 'extra'],
+        name='cloud_node')
+    node.id = str(node_num)
+    node.name = node.id
+    node.public_ips = []
+    node.private_ips = [ip_address_mock(node_num)]
+    return node
+
+def ip_address_mock(last_octet):
+    return '10.20.30.{}'.format(last_octet)
+
+class MockSize(object):
+    def __init__(self, factor):
+        self.id = 'z{}.test'.format(factor)
+        self.name = self.id
+        self.ram = 128 * factor
+        self.disk = 100 * factor
+        self.bandwidth = 16 * factor
+        self.price = float(factor)
+        self.extra = {}
+
+    def __eq__(self, other):
+        return self.id == other.id
+
+
+class MockTimer(object):
+    def schedule(self, want_time, callback, *args, **kwargs):
+        return callback(*args, **kwargs)
+
+
+class ActorTestMixin(object):
+    FUTURE_CLASS = pykka.ThreadingFuture
+    TIMEOUT = 3
+
+    def tearDown(self):
+        pykka.ActorRegistry.stop_all()
+
+    def wait_for_call(self, mock_func, timeout=TIMEOUT):
+        deadline = time.time() + timeout
+        while (not mock_func.called) and (time.time() < deadline):
+            time.sleep(.1)
+        self.assertTrue(mock_func.called, "{} not called".format(mock_func))
+
+
+class RemotePollLoopActorTestMixin(ActorTestMixin):
+    def build_monitor(self, *args, **kwargs):
+        self.timer = mock.MagicMock(name='timer_mock')
+        self.client = mock.MagicMock(name='client_mock')
+        self.subscriber = mock.Mock(name='subscriber_mock')
+        self.monitor = self.TEST_CLASS.start(
+            self.client, self.timer, *args, **kwargs).proxy()

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list