[ARVADOS] created: 05bb359e5cb312cc5d8ccae94762a4e20a041c5f
git at public.curoverse.com
git at public.curoverse.com
Fri Oct 3 17:52:03 EDT 2014
at 05bb359e5cb312cc5d8ccae94762a4e20a041c5f (commit)
commit 05bb359e5cb312cc5d8ccae94762a4e20a041c5f
Author: Brett Smith <brett at curoverse.com>
Date: Fri Oct 3 17:53:57 2014 -0400
2881: Add services/nodemanager.
diff --git a/services/nodemanager/.gitignore b/services/nodemanager/.gitignore
new file mode 100644
index 0000000..488ddd5
--- /dev/null
+++ b/services/nodemanager/.gitignore
@@ -0,0 +1,4 @@
+*.pyc
+*.egg-info
+build/
+dist/
diff --git a/services/nodemanager/arvnodeman/__init__.py b/services/nodemanager/arvnodeman/__init__.py
new file mode 100644
index 0000000..ef437c1
--- /dev/null
+++ b/services/nodemanager/arvnodeman/__init__.py
@@ -0,0 +1,19 @@
+#!/usr/bin/env python
+
+# First import and expose all the classes we want to export.
+from .computenode import \
+ ComputeNodeSetupActor, ComputeNodeShutdownActor, ComputeNodeActor, \
+ ShutdownTimer
+from .daemon import NodeManagerDaemonActor
+from .jobqueue import JobQueueMonitorActor, ServerCalculator
+from .nodelist import ArvadosNodeListMonitorActor, CloudNodeListMonitorActor
+from .timedcallback import TimedCallBackActor
+
+__all__ = [name for name in locals().keys() if name[0].isupper()]
+
+# We now return you to your regularly scheduled program.
+import _strptime # See <http://bugs.python.org/issue7980#msg221094>.
+import logging
+
+logger = logging.getLogger('arvnodeman')
+logger.addHandler(logging.NullHandler())
diff --git a/services/nodemanager/arvnodeman/clientactor.py b/services/nodemanager/arvnodeman/clientactor.py
new file mode 100644
index 0000000..079cb03
--- /dev/null
+++ b/services/nodemanager/arvnodeman/clientactor.py
@@ -0,0 +1,77 @@
+#!/usr/bin/env python
+
+import logging
+import time
+
+import pykka
+
+from .config import actor_class
+
+def _notify_subscribers(response, subscribers):
+ dead_subscribers = set()
+ for subscriber in subscribers:
+ try:
+ subscriber(response)
+ except pykka.ActorDeadError:
+ dead_subscribers.add(subscriber)
+ subscribers.difference_update(dead_subscribers)
+
+class RemotePollLoopActor(actor_class):
+ def __init__(self, client, timer_actor, poll_wait=60, max_poll_wait=180):
+ super(RemotePollLoopActor, self).__init__()
+ self._client = client
+ self._timer = timer_actor
+ self._logger = logging.getLogger(self.LOGGER_NAME)
+ self._later = self.actor_ref.proxy()
+ self.min_poll_wait = poll_wait
+ self.max_poll_wait = max_poll_wait
+ self.poll_wait = self.min_poll_wait
+ self.last_poll_time = None
+ self.all_subscribers = set()
+ self.key_subscribers = {}
+ if hasattr(self, '_item_key'):
+ self.subscribe_to = self._subscribe_to
+
+ def _start_polling(self):
+ if self.last_poll_time is None:
+ self.last_poll_time = time.time()
+ self._later.poll()
+
+ def subscribe(self, subscriber):
+ self.all_subscribers.add(subscriber)
+ self._logger.debug("%r subscribed to all events", subscriber)
+ self._start_polling()
+
+ def _subscribe_to(self, key, subscriber):
+ self.key_subscribers.setdefault(key, set()).add(subscriber)
+ self._logger.debug("%r subscribed to events for '%s'", subscriber, key)
+ self._start_polling()
+
+ def _send_request(self):
+ raise NotImplementedError("subclasses must implement request method")
+
+ def _got_response(self, response):
+ self.poll_wait = self.min_poll_wait
+ _notify_subscribers(response, self.all_subscribers)
+ if hasattr(self, '_item_key'):
+ items = {self._item_key(x): x for x in response}
+ for key, subscribers in self.key_subscribers.iteritems():
+ _notify_subscribers(items.get(key), subscribers)
+
+ def _got_error(self, error):
+ self.poll_wait = min(self.poll_wait * 2, self.max_poll_wait)
+ self._logger.warning("Client error: %s - waiting %s seconds",
+ error, self.poll_wait)
+
+ def poll(self):
+ start_time = time.time()
+ try:
+ response = self._send_request()
+ except self.CLIENT_ERRORS as error:
+ self.last_poll_time = start_time
+ self._got_error(error)
+ else:
+ self.last_poll_time += self.poll_wait
+ self._got_response(response)
+ self._timer.schedule(self.last_poll_time + self.poll_wait,
+ self._later.poll)
diff --git a/services/nodemanager/arvnodeman/computenode/__init__.py b/services/nodemanager/arvnodeman/computenode/__init__.py
new file mode 100644
index 0000000..e31e7c1
--- /dev/null
+++ b/services/nodemanager/arvnodeman/computenode/__init__.py
@@ -0,0 +1,279 @@
+#!/usr/bin/env python
+
+import functools
+import itertools
+import logging
+import time
+
+import pykka
+
+from ..clientactor import _notify_subscribers
+from .. import config
+
+def arvados_node_mtime(node):
+ return time.mktime(time.strptime(node['modified_at'] + 'UTC',
+ '%Y-%m-%dT%H:%M:%SZ%Z')) - time.timezone
+
+def timestamp_fresh(timestamp, fresh_time):
+ return (time.time() - timestamp) < fresh_time
+
+def _retry(errors):
+ def decorator(orig_func):
+ @functools.wraps(orig_func)
+ def wrapper(self, *args, **kwargs):
+ try:
+ orig_func(self, *args, **kwargs)
+ except errors as error:
+ self._logger.warning(
+ "Client error: %s - waiting %s seconds",
+ error, self.retry_wait)
+ self._timer.schedule(self.retry_wait,
+ getattr(self._later, orig_func.__name__),
+ *args, **kwargs)
+ self.retry_wait = min(self.retry_wait * 2,
+ self.max_retry_wait)
+ else:
+ self.retry_wait = self.min_retry_wait
+ return wrapper
+ return decorator
+
+class BaseComputeNodeDriver(object):
+ def __init__(self, auth_kwargs, list_kwargs, create_kwargs, driver_class):
+ self.real = driver_class(**auth_kwargs)
+ self.list_kwargs = list_kwargs
+ self.create_kwargs = create_kwargs
+
+ def __getattr__(self, name):
+ # Proxy non-extension methods to the real driver.
+ if (not name.startswith('_') and not name.startswith('ex_')
+ and hasattr(self.real, name)):
+ return getattr(self.real, name)
+ else:
+ return super(BaseComputeNodeDriver, self).__getattr__(name)
+
+ def search_for(self, term, list_method, key=lambda item: item.id):
+ cache_key = (list_method, term)
+ if cache_key not in self.SEARCH_CACHE:
+ results = [item for item in getattr(self.real, list_method)()
+ if key(item) == term]
+ count = len(results)
+ if count != 1:
+ raise ValueError("{} returned {} results for '{}'".format(
+ list_method, count, term))
+ self.SEARCH_CACHE[cache_key] = results[0]
+ return self.SEARCH_CACHE[cache_key]
+
+ def list_nodes(self):
+ return self.real.list_nodes(**self.list_kwargs)
+
+ def arvados_create_kwargs(self, arvados_node):
+ raise NotImplementedError("BaseComputeNodeDriver.arvados_create_kwargs")
+
+ def create_node(self, size, arvados_node):
+ kwargs = self.create_kwargs.copy()
+ kwargs.update(self.arvados_create_kwargs(arvados_node))
+ kwargs['size'] = size
+ return self.real.create_node(**kwargs)
+
+ def post_create_node(self, cloud_node, arvados_node):
+ pass
+
+ @classmethod
+ def node_start_time(cls, node):
+ raise NotImplementedError("BaseComputeNodeDriver.node_start_time")
+
+
+ComputeNodeDriverClass = BaseComputeNodeDriver
+
+class ComputeNodeSetupActor(config.actor_class):
+ def __init__(self, timer_actor, arvados_client, cloud_client,
+ cloud_size, arvados_node=None,
+ retry_wait=1, max_retry_wait=180):
+ super(ComputeNodeSetupActor, self).__init__()
+ self._timer = timer_actor
+ self._arvados = arvados_client
+ self._cloud = cloud_client
+ self._later = self.actor_ref.proxy()
+ self._logger = logging.getLogger('arvnodeman.nodeup')
+ self.cloud_size = cloud_size
+ self.subscribers = set()
+ self.min_retry_wait = retry_wait
+ self.max_retry_wait = max_retry_wait
+ self.retry_wait = retry_wait
+ self.arvados_node = None
+ self.cloud_node = None
+ if arvados_node is None:
+ self._later.create_arvados_node()
+ else:
+ self._later.prepare_arvados_node(arvados_node)
+
+ @_retry(config.ARVADOS_ERRORS)
+ def create_arvados_node(self):
+ self.arvados_node = self._arvados.nodes().create(body={}).execute()
+ self._later.create_cloud_node()
+
+ @_retry(config.ARVADOS_ERRORS)
+ def prepare_arvados_node(self, node):
+ self.arvados_node = self._arvados.nodes().update(
+ uuid=node['uuid'],
+ body={'hostname': None,
+ 'ip_address': None,
+ 'slot_number': None,
+ 'first_ping_at': None,
+ 'last_ping_at': None,
+ 'info': {'ec2_instance_id': None,
+ 'last_action': "Prepared by Node Manager"}}
+ ).execute()
+ self._later.create_cloud_node()
+
+ @_retry(config.CLOUD_ERRORS)
+ def create_cloud_node(self):
+ self._logger.info("Creating cloud node with size %s.",
+ self.cloud_size.name)
+ self.cloud_node = self._cloud.create_node(self.cloud_size,
+ self.arvados_node)
+ self._logger.info("Cloud node %s created. Setting up.",
+ self.cloud_node.id)
+ self._later.setup_cloud_node()
+
+ @_retry(config.CLOUD_ERRORS)
+ def setup_cloud_node(self):
+ self._cloud.post_create_node(self.cloud_node, self.arvados_node)
+ self._logger.info("Cloud node %s set up.", self.cloud_node.id)
+ _notify_subscribers(self._later, self.subscribers)
+ self.subscribers = None
+
+ def stop_if_no_cloud_node(self):
+ if self.cloud_node is None:
+ self.stop()
+
+ def subscribe(self, subscriber):
+ if self.subscribers is None:
+ try:
+ subscriber(self._later)
+ except pykka.ActorDeadError:
+ pass
+ else:
+ self.subscribers.add(subscriber)
+
+
+class ComputeNodeShutdownActor(config.actor_class):
+ def __init__(self, timer_actor, cloud_client, cloud_node,
+ retry_wait=1, max_retry_wait=180):
+ super(ComputeNodeShutdownActor, self).__init__()
+ self._timer = timer_actor
+ self._cloud = cloud_client
+ self._later = self.actor_ref.proxy()
+ self._logger = logging.getLogger('arvnodeman.nodedown')
+ self.cloud_node = cloud_node
+ self.min_retry_wait = retry_wait
+ self.max_retry_wait = max_retry_wait
+ self.retry_wait = retry_wait
+ self._later.shutdown_node()
+
+ @_retry(config.CLOUD_ERRORS)
+ def shutdown_node(self):
+ self._cloud.destroy_node(self.cloud_node)
+ self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
+
+
+class ShutdownTimer(object):
+ def __init__(self, start_time, shutdown_windows):
+ first_window = shutdown_windows[0]
+ shutdown_windows = list(shutdown_windows[1:])
+ self._next_opening = start_time + (60 * first_window)
+ if len(shutdown_windows) % 2:
+ shutdown_windows.append(first_window)
+ else:
+ shutdown_windows[-1] += first_window
+ self.shutdown_windows = itertools.cycle([60 * n
+ for n in shutdown_windows])
+ self._open_start = self._next_opening
+ self._open_for = next(self.shutdown_windows)
+
+ def _advance_opening(self):
+ while self._next_opening < time.time():
+ self._open_start = self._next_opening
+ self._next_opening += self._open_for + next(self.shutdown_windows)
+ self._open_for = next(self.shutdown_windows)
+
+ def next_opening(self):
+ self._advance_opening()
+ return self._next_opening
+
+ def window_open(self):
+ self._advance_opening()
+ return 0 < (time.time() - self._open_start) < self._open_for
+
+
+class ComputeNodeActor(config.actor_class):
+ def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
+ timer_actor, arvados_node=None,
+ poll_stale_after=600, node_stale_after=3600):
+ super(ComputeNodeActor, self).__init__()
+ self._later = self.actor_ref.proxy()
+ self._logger = logging.getLogger('arvnodeman.computenode')
+ self._last_log = None
+ self._shutdowns = shutdown_timer
+ self._timer = timer_actor
+ self.cloud_node = cloud_node
+ self.cloud_node_start_time = cloud_node_start_time
+ self.arvados_node = arvados_node
+ self.poll_stale_after = poll_stale_after
+ self.node_stale_after = node_stale_after
+ self.subscribers = set()
+ self.last_shutdown_opening = None
+ self._later.consider_shutdown()
+
+ def subscribe(self, subscriber):
+ self.subscribers.add(subscriber)
+
+ def _debug(self, msg, *args):
+ if msg == self._last_log:
+ return
+ self._last_log = msg
+ self._logger.debug(msg, *args)
+
+ def _shutdown_eligible(self):
+ if self.arvados_node is None:
+ return timestamp_fresh(self.cloud_node_start_time,
+ self.node_stale_after)
+ else:
+ return (timestamp_fresh(arvados_node_mtime(self.arvados_node),
+ self.poll_stale_after) and
+ (self.arvados_node['info'].get('slurm_state') == 'idle'))
+
+ def consider_shutdown(self):
+ next_opening = self._shutdowns.next_opening()
+ if self._shutdowns.window_open():
+ if self._shutdown_eligible():
+ self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
+ _notify_subscribers(self._later, self.subscribers)
+ else:
+ self._debug("Node %s shutdown window open but node busy.",
+ self.cloud_node.id)
+ else:
+ self._debug("Node %s shutdown window closed. Next at %s.",
+ self.cloud_node.id, time.ctime(next_opening))
+ if self.last_shutdown_opening != next_opening:
+ self._timer.schedule(next_opening, self._later.consider_shutdown)
+ self.last_shutdown_opening = next_opening
+
+ def offer_arvados_pair(self, arvados_node):
+ if self.arvados_node is not None:
+ return None
+ elif arvados_node['ip_address'] in self.cloud_node.private_ips:
+ self.arvados_node = arvados_node
+ return self.cloud_node.id
+ else:
+ return None
+
+ def update_cloud_node(self, cloud_node):
+ if cloud_node is not None:
+ self.cloud_node = cloud_node
+ self._later.consider_shutdown()
+
+ def update_arvados_node(self, arvados_node):
+ if arvados_node is not None:
+ self.arvados_node = arvados_node
+ self._later.consider_shutdown()
diff --git a/services/nodemanager/arvnodeman/computenode/dummy.py b/services/nodemanager/arvnodeman/computenode/dummy.py
new file mode 100644
index 0000000..3a05891
--- /dev/null
+++ b/services/nodemanager/arvnodeman/computenode/dummy.py
@@ -0,0 +1,42 @@
+#!/usr/bin/env python
+
+import time
+
+import libcloud.compute.providers as cloud_provider
+import libcloud.compute.types as cloud_types
+
+from . import BaseComputeNodeDriver
+
+class ComputeNodeDriver(BaseComputeNodeDriver):
+ DEFAULT_DRIVER = cloud_provider.get_driver(cloud_types.Provider.DUMMY)
+ DEFAULT_REAL = DEFAULT_DRIVER('ComputeNodeDriver')
+ DUMMY_START_TIME = time.time()
+
+ def __init__(self, auth_kwargs, list_kwargs, create_kwargs,
+ driver_class=DEFAULT_DRIVER):
+ super(ComputeNodeDriver, self).__init__(
+ auth_kwargs, list_kwargs, create_kwargs, driver_class)
+ if driver_class is self.DEFAULT_DRIVER:
+ self.real = self.DEFAULT_REAL
+
+ def _ensure_private_ip(self, node):
+ if not node.private_ips:
+ node.private_ips = ['10.10.0.{}'.format(node.id)]
+
+ def arvados_create_kwargs(self, arvados_node):
+ return {}
+
+ def list_nodes(self):
+ nodelist = super(ComputeNodeDriver, self).list_nodes()
+ for node in nodelist:
+ self._ensure_private_ip(node)
+ return nodelist
+
+ def create_node(self, size, arvados_node):
+ node = super(ComputeNodeDriver, self).create_node(size, arvados_node)
+ self._ensure_private_ip(node)
+ return node
+
+ @classmethod
+ def node_start_time(cls, node):
+ return cls.DUMMY_START_TIME
diff --git a/services/nodemanager/arvnodeman/computenode/ec2.py b/services/nodemanager/arvnodeman/computenode/ec2.py
new file mode 100644
index 0000000..9f6bea7
--- /dev/null
+++ b/services/nodemanager/arvnodeman/computenode/ec2.py
@@ -0,0 +1,88 @@
+#!/usr/bin/env python
+
+import time
+
+import libcloud.compute.base as cloud_base
+import libcloud.compute.providers as cloud_provider
+import libcloud.compute.types as cloud_types
+from libcloud.compute.drivers import ec2 as cloud_ec2
+
+from . import BaseComputeNodeDriver
+
+### Monkeypatch libcloud to support AWS' new SecurityGroup API.
+class ANMEC2Connection(cloud_ec2.EC2Connection):
+ def request(self, *args, **kwargs):
+ params = kwargs.get('params')
+ if (params is not None) and (params.get('Action') == 'RunInstances'):
+ for key in params.keys():
+ if key.startswith('SecurityGroup.'):
+ new_key = key.replace('Group.', 'GroupId.', 1)
+ params[new_key] = params.pop(key).id
+ kwargs['params'] = params
+ return super(ANMEC2Connection, self).request(*args, **kwargs)
+
+
+class ANMEC2NodeDriver(cloud_ec2.EC2NodeDriver):
+ connectionCls = ANMEC2Connection
+
+
+class ComputeNodeDriver(BaseComputeNodeDriver):
+ DEFAULT_DRIVER = ANMEC2NodeDriver
+### End monkeypatch
+ SEARCH_CACHE = {}
+
+ def __init__(self, auth_kwargs, list_kwargs, create_kwargs,
+ driver_class=DEFAULT_DRIVER):
+ # We need full lists of keys up front because these loops modify
+ # dictionaries in-place.
+ for key in list_kwargs.keys():
+ list_kwargs[key.replace('_', ':')] = list_kwargs.pop(key)
+ self.tags = {key[4:]: value
+ for key, value in list_kwargs.iteritems()
+ if key.startswith('tag:')}
+ super(ComputeNodeDriver, self).__init__(
+ auth_kwargs, {'ex_filters': list_kwargs}, create_kwargs,
+ driver_class)
+ for key in self.create_kwargs.keys():
+ init_method = getattr(self, '_init_' + key, None)
+ if init_method is not None:
+ new_pair = init_method(self.create_kwargs.pop(key))
+ if new_pair is not None:
+ self.create_kwargs[new_pair[0]] = new_pair[1]
+
+ def _init_image_id(self, image_id):
+ return 'image', self.search_for(image_id, 'list_images')
+
+ def _init_ping_host(self, ping_host):
+ self.ping_host = ping_host
+
+ def _init_security_groups(self, group_names):
+ return 'ex_security_groups', [
+ self.search_for(gname.strip(), 'ex_get_security_groups')
+ for gname in group_names.split(',')]
+
+ def _init_subnet_id(self, subnet_id):
+ return 'ex_subnet', self.search_for(subnet_id, 'ex_list_subnets')
+
+ def _init_ssh_key(self, filename):
+ with open(filename) as ssh_file:
+ key = cloud_base.NodeAuthSSHKey(ssh_file.read())
+ return 'auth', key
+
+ def arvados_create_kwargs(self, arvados_node):
+ result = {'ex_metadata': self.tags.copy(),
+ 'name': '{}.{}'.format(arvados_node['hostname'],
+ arvados_node['domain'])}
+ ping_secret = arvados_node['info'].get('ping_secret')
+ if ping_secret is not None:
+ ping_url = ('https://{}/arvados/v1/nodes/{}/ping?ping_secret={}'.
+ format(self.ping_host, arvados_node['uuid'],
+ ping_secret))
+ result['ex_userdata'] = ping_url
+ return result
+
+ @classmethod
+ def node_start_time(cls, node):
+ time_str = node.extra['launch_time'].split('.', 2)[0] + 'UTC'
+ return time.mktime(time.strptime(
+ time_str,'%Y-%m-%dT%H:%M:%S%Z')) - time.timezone
diff --git a/services/nodemanager/arvnodeman/config.py b/services/nodemanager/arvnodeman/config.py
new file mode 100644
index 0000000..2cfaaee
--- /dev/null
+++ b/services/nodemanager/arvnodeman/config.py
@@ -0,0 +1,97 @@
+#!/usr/bin/env python
+
+import ConfigParser
+import importlib
+import logging
+import ssl
+
+import apiclient.errors as apierror
+import arvados
+import httplib2
+import libcloud.common.types as cloud_types
+import pykka
+
+# IOError is the base class for socket.error and friends.
+# It seems like it hits the sweet spot for operations we want to retry:
+# it's low-level, but unlikely to catch code bugs.
+NETWORK_ERRORS = (IOError, ssl.SSLError)
+ARVADOS_ERRORS = NETWORK_ERRORS + (apierror.Error,)
+CLOUD_ERRORS = NETWORK_ERRORS + (cloud_types.LibcloudError,)
+
+actor_class = pykka.ThreadingActor
+
+class NodeManagerConfig(ConfigParser.SafeConfigParser):
+ LOGGING_NONLEVELS = frozenset(['file'])
+
+ def __init__(self, *args, **kwargs):
+ # Can't use super() because SafeConfigParser is an old-style class.
+ ConfigParser.SafeConfigParser.__init__(self, *args, **kwargs)
+ for sec_name, settings in {
+ 'Arvados': {'insecure': 'no',
+ 'timeout': '15'},
+ 'Daemon': {'max_nodes': '1',
+ 'poll_time': '60',
+ 'max_poll_time': '300',
+ 'poll_stale_after': '600',
+ 'node_stale_after': str(60 * 60 * 2)},
+ 'Logging': {'file': '/dev/stderr',
+ 'level': 'WARNING'},
+ }.iteritems():
+ if not self.has_section(sec_name):
+ self.add_section(sec_name)
+ for opt_name, value in settings.iteritems():
+ if not self.has_option(sec_name, opt_name):
+ self.set(sec_name, opt_name, value)
+
+ def get_section(self, section, transformer=None):
+ result = self._dict()
+ for key, value in self.items(section):
+ if transformer is not None:
+ try:
+ value = transformer(value)
+ except (TypeError, ValueError):
+ pass
+ result[key] = value
+ return result
+
+ def log_levels(self):
+ return {key: getattr(logging, self.get('Logging', key).upper())
+ for key in self.options('Logging')
+ if key not in self.LOGGING_NONLEVELS}
+
+ def new_arvados_client(self):
+ if self.has_option('Daemon', 'certs_file'):
+ certs_file = self.get('Daemon', 'certs_file')
+ else:
+ certs_file = None
+ insecure = self.getboolean('Arvados', 'insecure')
+ http = httplib2.Http(timeout=self.getint('Arvados', 'timeout'),
+ ca_certs=certs_file,
+ disable_ssl_certificate_validation=insecure)
+ return arvados.api('v1',
+ cache=False, # Don't reuse an existing client.
+ host=self.get('Arvados', 'host'),
+ token=self.get('Arvados', 'token'),
+ insecure=insecure,
+ http=http)
+
+ def new_cloud_client(self):
+ module = importlib.import_module('arvnodeman.computenode.' +
+ self.get('Cloud', 'provider'))
+ return module.ComputeNodeDriver(self.get_section('Cloud Credentials'),
+ self.get_section('Cloud List'),
+ self.get_section('Cloud Create'))
+
+ def node_sizes(self, all_sizes):
+ size_kwargs = {}
+ for sec_name in self.sections():
+ sec_words = sec_name.split(None, 2)
+ if sec_words[0] != 'Size':
+ continue
+ size_kwargs[sec_words[1]] = self.get_section(sec_name, int)
+ return [(size, size_kwargs[size.id]) for size in all_sizes
+ if size.id in size_kwargs]
+
+ def shutdown_windows(self):
+ return [int(n)
+ for n in self.get('Cloud', 'shutdown_windows').split(',')]
diff --git a/services/nodemanager/arvnodeman/daemon.py b/services/nodemanager/arvnodeman/daemon.py
new file mode 100644
index 0000000..8d8f107
--- /dev/null
+++ b/services/nodemanager/arvnodeman/daemon.py
@@ -0,0 +1,263 @@
+#!/usr/bin/env python
+
+import functools
+import logging
+import time
+
+import pykka
+
+from . import computenode as cnode
+from .config import actor_class
+
+class NodeManagerDaemonActor(actor_class):
+ class PairingTracker(object):
+ def __init__(self, key_func, paired_items, unpaired_items):
+ self.key_func = key_func
+ self._paired_items = paired_items
+ self._unpaired_items = unpaired_items
+
+ def all_items(self, response):
+ self.unseen = set(self._paired_items.iterkeys())
+ self.unseen.update(self._unpaired_items.iterkeys())
+ for item in response:
+ key = self.key_func(item)
+ yield key, item
+ if key in self.unseen:
+ self.unseen.remove(key)
+
+ def new_items(self, response):
+ for key, item in self.all_items(response):
+ if key not in self.unseen:
+ yield key, item
+
+ def unpaired_items(self, response):
+ for key, item in self.all_items(response):
+ if key not in self._paired_items:
+ yield key, item
+
+ def unseen_items(self):
+ for key in self.unseen:
+ if key in self._paired_items:
+ home_dict = self._paired_items
+ else:
+ home_dict = self._unpaired_items
+ yield home_dict, key
+
+
+ def __init__(self, server_wishlist_actor, arvados_nodes_actor,
+ cloud_nodes_actor, timer_actor,
+ arvados_factory, cloud_factory,
+ shutdown_windows, max_nodes,
+ poll_stale_after=600, node_stale_after=7200,
+ node_setup_class=cnode.ComputeNodeSetupActor,
+ node_shutdown_class=cnode.ComputeNodeShutdownActor,
+ node_actor_class=cnode.ComputeNodeActor):
+ super(NodeManagerDaemonActor, self).__init__()
+ self._node_setup = node_setup_class
+ self._node_shutdown = node_shutdown_class
+ self._node_actor = node_actor_class
+ self._timer = timer_actor
+ self._new_arvados = arvados_factory
+ self._new_cloud = cloud_factory
+ self._cloud_driver = self._new_cloud()
+ self._logger = logging.getLogger('arvnodeman.daemon')
+ self._later = self.actor_ref.proxy()
+ self.shutdown_windows = shutdown_windows
+ self.max_nodes = max_nodes
+ self.poll_stale_after = poll_stale_after
+ self.node_stale_after = node_stale_after
+ self.last_polls = {}
+ for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
+ poll_actor = locals()[poll_name + '_actor']
+ poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
+ setattr(self, '_{}_actor'.format(poll_name), poll_actor)
+ self.last_polls[poll_name] = -self.poll_stale_after
+ # Map cloud node IDs, or Arvados node UUIDs, to their ComputeNodeActors.
+ self.unpaired_clouds = {}
+ self.paired_clouds = {}
+ self.paired_arv = {}
+ self.unpaired_arv = {} # Arvados node UUIDs to full node data
+ self.assigned_arv = {} # Arvados node UUIDs to assignment timestamps
+ self.booting = {} # Actor IDs to ComputeNodeSetupActors
+ self.shutdowns = {} # Cloud node IDs to ComputeNodeShutdownActors
+ self._logger.debug("Daemon initialized")
+
+ def _update_poll_time(self, poll_key):
+ self.last_polls[poll_key] = time.time()
+
+ def _pair_nodes(self, cloud_key, arv_key, actor=None):
+ if actor is None:
+ actor = self.unpaired_clouds[cloud_key]
+ self._logger.info("Cloud node %s has associated with Arvados node %s",
+ cloud_key, arv_key)
+ self.paired_clouds[cloud_key] = actor
+ self.paired_arv[arv_key] = actor
+ self._arvados_nodes_actor.subscribe_to(arv_key,
+ actor.update_arvados_node)
+ self.unpaired_clouds.pop(cloud_key, None)
+ self.unpaired_arv.pop(arv_key, None)
+
+ def _new_node(self, cloud_node, arvados_node=None):
+ start_time = self._cloud_driver.node_start_time(cloud_node)
+ shutdown_timer = cnode.ShutdownTimer(start_time,
+ self.shutdown_windows)
+ actor = self._node_actor.start(
+ cloud_node=cloud_node,
+ cloud_node_start_time=start_time,
+ shutdown_timer=shutdown_timer,
+ timer_actor=self._timer,
+ arvados_node=arvados_node,
+ poll_stale_after=self.poll_stale_after,
+ node_stale_after=self.node_stale_after).proxy()
+ actor.subscribe(self._later.shutdown_offer)
+ self._cloud_nodes_actor.subscribe_to(cloud_node.id,
+ actor.update_cloud_node)
+ if arvados_node is not None:
+ self._pair_nodes(cloud_node.id, arvados_node['uuid'], actor)
+ return actor
+
+ def update_cloud_nodes(self, nodelist):
+ self._update_poll_time('cloud_nodes')
+ pairs = self.PairingTracker(lambda n: n.id,
+ self.paired_clouds, self.unpaired_clouds)
+ for key, node in pairs.new_items(nodelist):
+ actor = self._new_node(node)
+ for arv_key, arv_node in self.unpaired_arv.iteritems():
+ if actor.offer_arvados_pair(arv_node).get():
+ self._pair_nodes(key, arv_key, actor)
+ break
+ else:
+ self._logger.info("Registering new cloud node %s", key)
+ self.unpaired_clouds[key] = actor
+ for source, key in pairs.unseen_items():
+ source.pop(key).stop()
+ if key in self.shutdowns:
+ self.shutdowns.pop(key).stop()
+
+ def update_arvados_nodes(self, nodelist):
+ self._update_poll_time('arvados_nodes')
+ pairs = self.PairingTracker(lambda n: n['uuid'],
+ self.paired_arv, self.unpaired_arv)
+ for key, node in pairs.unpaired_items(nodelist):
+ if key not in self.unpaired_arv:
+ self._logger.info("Registering new Arvados node %s", key)
+ self.unpaired_arv[key] = node
+ for cloud_key, actor in self.unpaired_clouds.iteritems():
+ if actor.offer_arvados_pair(node).get():
+ self._pair_nodes(cloud_key, key, actor)
+ break
+ for source, key in pairs.unseen_items():
+ if source is self.unpaired_arv:
+ del self.unpaired_arv[key]
+
+ def _node_count(self):
+ up = sum(len(nodelist) for nodelist in
+ [self.paired_clouds, self.unpaired_clouds, self.booting])
+ return up - len(self.shutdowns)
+
+ def _nodes_wanted(self):
+ return len(self.last_wishlist) - self._node_count()
+
+ def _nodes_excess(self):
+ return -self._nodes_wanted()
+
+ def update_server_wishlist(self, wishlist):
+ self._update_poll_time('server_wishlist')
+ self.last_wishlist = wishlist[:self.max_nodes]
+ nodes_wanted = self._nodes_wanted()
+ if nodes_wanted > 0:
+ self._later.start_node()
+ elif (nodes_wanted < 0) and self.booting:
+ self._later.stop_booting_node()
+
+ def _check_poll_freshness(orig_func):
+ @functools.wraps(orig_func)
+ def wrapper(self, *args, **kwargs):
+ now = time.time()
+ if all(now - t < self.poll_stale_after
+ for t in self.last_polls.itervalues()):
+ return orig_func(self, *args, **kwargs)
+ else:
+ return None
+ return wrapper
+
+ def _find_reusable_arvados_node(self):
+ for node in self.unpaired_arv.itervalues():
+ assigned_at = self.assigned_arv.get(node['uuid'],
+ -self.node_stale_after)
+ if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
+ self.node_stale_after) and
+ not cnode.timestamp_fresh(assigned_at,
+ self.node_stale_after)):
+ return node
+ return None
+
+ @_check_poll_freshness
+ def start_node(self):
+ nodes_wanted = self._nodes_wanted()
+ if nodes_wanted < 1:
+ return None
+ arvados_node = self._find_reusable_arvados_node()
+ size = self.last_wishlist[nodes_wanted - 1]
+ self._logger.info("Want %s more nodes. Booting a %s node.",
+ nodes_wanted, size.name)
+ new_setup = self._node_setup.start(
+ timer_actor=self._timer,
+ arvados_client=self._new_arvados(),
+ arvados_node=arvados_node,
+ cloud_client=self._new_cloud(),
+ cloud_size=size).proxy()
+ self.booting[new_setup.actor_ref.actor_urn] = new_setup
+ if arvados_node is not None:
+ self.assigned_arv[arvados_node['uuid']] = time.time()
+ new_setup.subscribe(self._later.node_up)
+ if nodes_wanted > 1:
+ self._later.start_node()
+
+ def _actor_nodes(self, node_actor):
+ return pykka.get_all([node_actor.cloud_node, node_actor.arvados_node])
+
+ def node_up(self, setup_proxy):
+ cloud_node, arvados_node = self._actor_nodes(setup_proxy)
+ self._new_node(cloud_node, arvados_node)
+ del self.booting[setup_proxy.actor_ref.actor_urn]
+ self.assigned_arv.pop(arvados_node['uuid'], None)
+ setup_proxy.stop()
+
+ @_check_poll_freshness
+ def stop_booting_node(self):
+ nodes_excess = self._nodes_excess()
+ if (nodes_excess < 1) or not self.booting:
+ return None
+ for key, node in self.booting.iteritems():
+ node.stop_if_no_cloud_node().get()
+ if not node.actor_ref.is_alive():
+ del self.booting[key]
+ if nodes_excess > 1:
+ self._later.stop_booting_node()
+ break
+
+ @_check_poll_freshness
+ def shutdown_offer(self, node_actor):
+ if self._nodes_excess() < 1:
+ return None
+ cloud_node, arvados_node = self._actor_nodes(node_actor)
+ if cloud_node.id in self.shutdowns:
+ return None
+ shutdown = self._node_shutdown.start(timer_actor=self._timer,
+ cloud_client=self._new_cloud(),
+ cloud_node=cloud_node).proxy()
+ self.shutdowns[cloud_node.id] = shutdown
+
+ def shutdown(self):
+ self._logger.info("Shutting down after signal.")
+ self.poll_stale_after = -1 # Inhibit starting/stopping nodes
+ for bootnode in self.booting.itervalues():
+ bootnode.stop_if_no_cloud_node()
+ self._later.await_shutdown()
+
+ def await_shutdown(self):
+ if any(node.actor_ref.is_alive() for node in self.booting.itervalues()):
+ self._timer.schedule(time.time() + 1, self._later.await_shutdown)
+ else:
+ self.stop()
diff --git a/services/nodemanager/arvnodeman/jobqueue.py b/services/nodemanager/arvnodeman/jobqueue.py
new file mode 100644
index 0000000..9daf556
--- /dev/null
+++ b/services/nodemanager/arvnodeman/jobqueue.py
@@ -0,0 +1,78 @@
+#!/usr/bin/env python
+
+import arvados.errors as arverror
+
+from . import clientactor
+
+class ServerCalculator(object):
+ class SizeWrapper(object):
+ def __init__(self, real_size, **kwargs):
+ self.real = real_size
+ for name in ['id', 'name', 'ram', 'disk', 'bandwidth', 'price',
+ 'extra']:
+ setattr(self, name, getattr(self.real, name))
+ self.cores = kwargs.pop('cores')
+ self.scratch = self.disk
+ for name, override in kwargs.iteritems():
+ if not hasattr(self, name):
+ raise ValueError("unrecognized size field '%s'" % (name,))
+ setattr(self, name, override)
+
+ def meets_constraints(self, **kwargs):
+ for name, want_value in kwargs.iteritems():
+ have_value = getattr(self, name)
+ if (have_value != 0) and (have_value < want_value):
+ return False
+ return True
+
+
+ def __init__(self, server_list, max_nodes=None):
+ self.sizes = [self.SizeWrapper(s, **kws) for s, kws in server_list]
+ self.sizes.sort(key=lambda s: s.price)
+ self.max_nodes = max_nodes or float("inf")
+
+ @staticmethod
+ def coerce_int(x, fallback):
+ try:
+ return int(x)
+ except (TypeError, ValueError):
+ return fallback
+
+ def size_for_constraints(self, constraints):
+ want_value = lambda key: self.coerce_int(constraints.get(key), 0)
+ wants = {'cores': want_value('min_cores_per_node'),
+ 'ram': want_value('min_ram_mb_per_node'),
+ 'scratch': want_value('min_scratch_mb_per_node')}
+ for size in self.sizes:
+ if size.meets_constraints(**wants):
+ return size
+ return None
+
+ def servers_for_queue(self, queue):
+ servers = []
+ for job in queue:
+ constraints = job['runtime_constraints']
+ want_count = self.coerce_int(constraints.get('min_nodes'), 1)
+ size = self.size_for_constraints(constraints)
+ if (want_count < self.max_nodes) and (size is not None):
+ servers.extend([size.real] * max(1, want_count))
+ return servers
+
+
+class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
+ CLIENT_ERRORS = (arverror.ApiError,)
+ LOGGER_NAME = 'arvnodeman.jobqueue'
+
+ def __init__(self, client, timer_actor, server_calc, *args, **kwargs):
+ super(JobQueueMonitorActor, self).__init__(
+ client, timer_actor, *args, **kwargs)
+ self._calculator = server_calc
+
+ def _send_request(self):
+ return self._client.jobs().queue().execute()['items']
+
+ def _got_response(self, queue):
+ server_list = self._calculator.servers_for_queue(queue)
+ self._logger.debug("Sending server wishlist: %s",
+ ', '.join(s.name for s in server_list))
+ return super(JobQueueMonitorActor, self)._got_response(server_list)
diff --git a/services/nodemanager/arvnodeman/launcher.py b/services/nodemanager/arvnodeman/launcher.py
new file mode 100644
index 0000000..7539a9b
--- /dev/null
+++ b/services/nodemanager/arvnodeman/launcher.py
@@ -0,0 +1,120 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+import argparse
+import logging
+import signal
+import sys
+import time
+
+import daemon
+import pykka
+
+from . import * # This imports all the Actor classes.
+from . import config as nmconfig
+
+node_daemon = None
+
+def abort(msg, code=1):
+ print("arvados-node-manager: " + msg)
+ sys.exit(code)
+
+def parse_cli(args):
+ parser = argparse.ArgumentParser(
+ prog='arvados-node-manager',
+ description="Dynamically allocate Arvados cloud compute nodes")
+ parser.add_argument(
+ '--foreground', action='store_true', default=False,
+ help="Run in the foreground. Don't daemonize.")
+ parser.add_argument(
+ '--config', help="Path to configuration file")
+ return parser.parse_args(args)
+
+def load_config(path):
+ if not path:
+ abort("No --config file specified", 2)
+ config = nmconfig.NodeManagerConfig()
+ try:
+ with open(path) as config_file:
+ config.readfp(config_file)
+ except (IOError, OSError) as error:
+ abort("Error reading configuration file {}: {}".format(path, error))
+ return config
+
+def setup_logging(path, level, **sublevels):
+ handler = logging.FileHandler(path)
+ handler.setFormatter(logging.Formatter(
+ '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
+ '%Y-%m-%d %H:%M:%S'))
+ root_logger = logging.getLogger()
+ root_logger.addHandler(handler)
+ root_logger.setLevel(level)
+ for logger_name, sublevel in sublevels.iteritems():
+ sublogger = logging.getLogger(logger_name)
+ sublogger.setLevel(sublevel)
+
+def launch_pollers(config):
+ cloud_client = config.new_cloud_client()
+ arvados_client = config.new_arvados_client()
+ size_list = config.node_sizes(cloud_client.list_sizes())
+ if not size_list:
+ abort("No valid sizes configured")
+
+ server_calculator = ServerCalculator(
+ size_list, config.getint('Daemon', 'max_nodes'))
+ poll_time = config.getint('Daemon', 'poll_time')
+ max_poll_time = config.getint('Daemon', 'max_poll_time')
+
+ timer = TimedCallBackActor.start(poll_time / 10.0).proxy()
+ cloud_node_poller = CloudNodeListMonitorActor.start(
+ cloud_client, timer, poll_time, max_poll_time).proxy()
+ arvados_node_poller = ArvadosNodeListMonitorActor.start(
+ arvados_client, timer, poll_time, max_poll_time).proxy()
+ job_queue_poller = JobQueueMonitorActor.start(
+ config.new_arvados_client(), timer, server_calculator,
+ poll_time, max_poll_time).proxy()
+ return timer, cloud_node_poller, arvados_node_poller, job_queue_poller
+
+_caught_signals = {}
+def shutdown_signal(signal_code, frame):
+ current_count = _caught_signals.get(signal_code, 0)
+ _caught_signals[signal_code] = current_count + 1
+ if node_daemon is None:
+ sys.exit(-signal_code)
+ elif current_count == 0:
+ node_daemon.shutdown()
+ elif current_count == 1:
+ pykka.ActorRegistry.stop_all()
+ else:
+ sys.exit(-signal_code)
+
+def main(args=None):
+ global node_daemon
+ args = parse_cli(args)
+ config = load_config(args.config)
+
+ if not args.foreground:
+ daemon.DaemonContext().open()
+ for sigcode in [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]:
+ signal.signal(sigcode, shutdown_signal)
+
+ setup_logging(config.get('Logging', 'file'), **config.log_levels())
+ timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
+ launch_pollers(config)
+ node_daemon = NodeManagerDaemonActor.start(
+ job_queue_poller, arvados_node_poller, cloud_node_poller, timer,
+ config.new_arvados_client, config.new_cloud_client,
+ config.shutdown_windows(), config.getint('Daemon', 'max_nodes'),
+ config.getint('Daemon', 'poll_stale_after'),
+ config.getint('Daemon', 'node_stale_after')).proxy()
+
+ signal.pause()
+ daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
+ while not daemon_stopped():
+ time.sleep(1)
+ pykka.ActorRegistry.stop_all()
+
+
+if __name__ == '__main__':
+ main()
diff --git a/services/nodemanager/arvnodeman/nodelist.py b/services/nodemanager/arvnodeman/nodelist.py
new file mode 100644
index 0000000..dd15dc2
--- /dev/null
+++ b/services/nodemanager/arvnodeman/nodelist.py
@@ -0,0 +1,27 @@
+#!/usr/bin/env python
+
+import arvados.errors as arverror
+
+from . import clientactor
+from . import config
+
+class ArvadosNodeListMonitorActor(clientactor.RemotePollLoopActor):
+ CLIENT_ERRORS = config.ARVADOS_ERRORS
+ LOGGER_NAME = 'arvnodeman.arvados_nodes'
+
+ def _item_key(self, node):
+ return node['uuid']
+
+ def _send_request(self):
+ return self._client.nodes().list(limit=10000).execute()['items']
+
+
+class CloudNodeListMonitorActor(clientactor.RemotePollLoopActor):
+ CLIENT_ERRORS = config.CLOUD_ERRORS
+ LOGGER_NAME = 'arvnodeman.cloud_nodes'
+
+ def _item_key(self, node):
+ return node.id
+
+ def _send_request(self):
+ return self._client.list_nodes()
diff --git a/services/nodemanager/arvnodeman/timedcallback.py b/services/nodemanager/arvnodeman/timedcallback.py
new file mode 100644
index 0000000..ee65a71
--- /dev/null
+++ b/services/nodemanager/arvnodeman/timedcallback.py
@@ -0,0 +1,33 @@
+#!/usr/bin/env python
+
+import heapq
+import time
+
+import pykka
+
+from .config import actor_class
+
+class TimedCallBackActor(actor_class):
+ def __init__(self, max_sleep=1):
+ super(TimedCallBackActor, self).__init__()
+ self._proxy = self.actor_ref.proxy()
+ self.messages = []
+ self.max_sleep = max_sleep
+
+ def schedule(self, delivery_time, receiver, *args, **kwargs):
+ heapq.heappush(self.messages, (delivery_time, receiver, args, kwargs))
+ self._proxy.deliver()
+
+ def deliver(self):
+ if not self.messages:
+ return None
+ til_next = self.messages[0][0] - time.time()
+ if til_next < 0:
+ t, receiver, args, kwargs = heapq.heappop(self.messages)
+ try:
+ receiver(*args, **kwargs)
+ except pykka.ActorDeadError:
+ pass
+ else:
+ time.sleep(min(til_next, self.max_sleep))
+ self._proxy.deliver()
diff --git a/services/nodemanager/bin/arvados-node-manager b/services/nodemanager/bin/arvados-node-manager
new file mode 100644
index 0000000..fba3453
--- /dev/null
+++ b/services/nodemanager/bin/arvados-node-manager
@@ -0,0 +1,4 @@
+#!/usr/bin/env python
+
+from arvnodeman.launcher import main
+main()
diff --git a/services/nodemanager/doc/ec2.example.cfg b/services/nodemanager/doc/ec2.example.cfg
new file mode 100644
index 0000000..b2b2e2d
--- /dev/null
+++ b/services/nodemanager/doc/ec2.example.cfg
@@ -0,0 +1,119 @@
+# EC2 configuration for Arvados Node Manager.
+# All times are in seconds unless specified otherwise.
+
+[Daemon]
+# Node Manager will not start any compute nodes when at least this
+# many are running.
+max_nodes = 8
+
+# Poll EC2 nodes and Arvados for new information every N seconds.
+poll_time = 60
+
+# Polls have exponential backoff when services fail to respond.
+# This is the longest time to wait between polls.
+max_poll_time = 300
+
+# If Node Manager can't succesfully poll a service for this long,
+# it will never start or stop compute nodes, on the assumption that its
+# information is too outdated.
+poll_stale_after = 600
+
+# "Node stale time" affects two related behaviors.
+# 1. If a compute node fails to ping an Arvados node for this long,
+# assume that it failed to bootstrap correctly, and consider it eligible
+# for shutdown.
+# This setting is only considered when the compute node is in a normal
+# shutdown window (see below). You probably want to set this so that a
+# shutdown window opens just after time expires.
+# 2. When the Node Manager starts a new compute node, it will try to reuse
+# an Arvados node that hasn't been updated for this long.
+node_stale_after = 3000
+
+# File path for Certificate Authorities
+certs_file = /etc/ssl/certs/ca-certificates.crt
+
+[Logging]
+# Log file path
+file = /var/log/arvados/node-manager.log
+
+# Log level for most Node Manager messages.
+# Choose one of DEBUG, INFO, WARNING, ERROR, or CRITICAL.
+# WARNING lets you know when polling a service fails.
+# INFO additionally lets you know when a compute node is started or stopped.
+level = INFO
+
+# You can also set different log levels for specific libraries.
+# Pykka is the Node Manager's actor library.
+# Setting this to DEBUG will display tracebacks for uncaught
+# exceptions in the actors, but it's also very chatty.
+pykka = WARNING
+
+# Setting apiclient to INFO will log the URL of every Arvados API request.
+apiclient = WARNING
+
+[Arvados]
+host = zyxwv.arvadosapi.com
+token = ARVADOS_TOKEN
+timeout = 15
+
+# Accept an untrusted SSL certificate from the API server?
+insecure = no
+
+[Cloud]
+provider = ec2
+
+# It's usually most cost-effective to shut down compute nodes during narrow
+# windows of time. For example, EC2 bills each node by the hour, so the best
+# time to shut down a node is right before a new hour of uptime starts.
+# Shutdown windows define these periods of time. These are windows in
+# full minutes, separated by commas. Counting from the time the node is
+# booted, the node WILL NOT shut down for N1 minutes; then it MAY shut down
+# for N2 minutes; then it WILL NOT shut down for N3 minutes; and so on.
+# For example, "54, 5, 1" means the node may shut down from the 54th to the
+# 59th minute of each hour of uptime.
+# Specify at least two windows. You can add as many as you need beyond that.
+shutdown_windows = 54, 5, 1
+
+[Cloud Credentials]
+key = KEY
+secret = SECRET_KEY
+region = us-east-1
+
+[Cloud List]
+# This section defines filters that find compute nodes.
+# Tags that you specify here will automatically be added to nodes you create.
+# Replace colons in Amazon filters with underscores
+# (e.g., write "tag:mytag" as "tag_mytag").
+instance-state-name = running
+tag_arvados-class = dynamic-compute
+tag_cluster = zyxwv
+
+[Cloud Create]
+ex_keyname = string
+
+# New compute nodes will send pings to Arvados at this host.
+# You may specify a port, and use brackets to disambiguate IPv6 addresses.
+ping_host = hostname:port
+
+# File path for an SSH key that can log in to the compute node.
+ssh_key = filename
+
+# The EC2 IDs of the image and subnet compute nodes should use.
+image_id = string
+subnet_id = string
+
+# Comma-separated EC2 IDs for the security group(s) assigned to each
+# compute node.
+security_groups = sg1, sg2
+
+[Size t2.medium]
+# You can define any number of Size sections to list EC2 sizes you're
+# willing to use. The Node Manager should boot the cheapest size(s) that
+# can run jobs in the queue (N.B.: defining more than one size has not been
+# tested yet).
+# Each size section MUST define the number of cores it has. You may also
+# want to define the number of mebibytes of scratch space for Crunch jobs.
+# You can also override Amazon's provided data fields by setting the same
+# names here.
+cores = 2
+scratch = 100
\ No newline at end of file
diff --git a/services/nodemanager/doc/local.example.cfg b/services/nodemanager/doc/local.example.cfg
new file mode 100644
index 0000000..33ca3d6
--- /dev/null
+++ b/services/nodemanager/doc/local.example.cfg
@@ -0,0 +1,41 @@
+# You can use this configuration to run a development Node Manager for
+# testing. It uses libcloud's dummy driver and your own development API server.
+# When new cloud nodes are created, you'll need to simulate the ping that
+# they send to the Arvados API server. The easiest way I've found to do that
+# is through the API server Rails console: load the Node object, set its
+# IP address to 10.10.0.N (where N is the cloud node's ID), and save.
+
+[Daemon]
+max_nodes = 8
+poll_time = 15
+max_poll_time = 60
+poll_stale_after = 600
+node_stale_after = 300
+certs_file = /etc/ssl/certs/ca-certificates.crt
+
+[Logging]
+level = DEBUG
+pykka = DEBUG
+apiclient = WARNING
+
+[Arvados]
+host = localhost:3030
+# This is the token for the text fixture's admin user.
+token = 4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h
+insecure = yes
+timeout = 15
+
+[Cloud]
+provider = dummy
+shutdown_windows = 1, 1
+timeout = 15
+
+[Cloud Credentials]
+creds = dummycreds
+
+[Cloud List]
+[Cloud Create]
+
+[Size Medium]
+cores = 4
+scratch = 1234
diff --git a/services/nodemanager/setup.py b/services/nodemanager/setup.py
new file mode 100644
index 0000000..fabb883
--- /dev/null
+++ b/services/nodemanager/setup.py
@@ -0,0 +1,41 @@
+#!/usr/bin/env python
+
+import os
+import subprocess
+import time
+
+from setuptools import setup, find_packages
+
+SETUP_DIR = os.path.dirname(__file__) or "."
+cmd_opts = {'egg_info': {}}
+try:
+ git_tags = subprocess.check_output(
+ ['git', 'log', '--first-parent', '--max-count=1',
+ '--format=format:%ct %h', SETUP_DIR]).split()
+ assert len(git_tags) == 2
+except (AssertionError, OSError, subprocess.CalledProcessError):
+ pass
+else:
+ git_tags[0] = time.strftime('%Y%m%d%H%M%S', time.gmtime(int(git_tags[0])))
+ cmd_opts['egg_info']['tag_build'] = '.{}.{}'.format(*git_tags)
+
+setup(name='arvados-node-manager',
+ version='0.1',
+ description='Arvados compute node manager',
+ author='Arvados',
+ author_email='info at arvados.org',
+ url="https://arvados.org",
+ license='GNU Affero General Public License, version 3.0',
+ packages=find_packages(),
+ install_requires=[
+ 'apache-libcloud',
+ 'arvados-python-client',
+ 'pykka',
+ 'python-daemon',
+ ],
+ scripts=['bin/arvados-node-manager'],
+ test_suite='tests',
+ tests_require=['mock>=1.0'],
+ zip_safe=False,
+ options=cmd_opts,
+ )
diff --git a/services/nodemanager/tests/__init__.py b/services/nodemanager/tests/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/services/nodemanager/tests/test_clientactor.py b/services/nodemanager/tests/test_clientactor.py
new file mode 100644
index 0000000..d09d342
--- /dev/null
+++ b/services/nodemanager/tests/test_clientactor.py
@@ -0,0 +1,125 @@
+#!/usr/bin/env python
+
+import unittest
+
+import mock
+import pykka
+
+import arvnodeman.clientactor as clientactor
+from . import testutil
+
+class RemotePollLoopActorTestCase(testutil.RemotePollLoopActorTestMixin,
+ unittest.TestCase):
+ class MockClientError(Exception):
+ pass
+
+ class TestActor(clientactor.RemotePollLoopActor):
+ LOGGER_NAME = 'arvnodeman.testpoll'
+
+ def _send_request(self):
+ return self._client()
+ TestActor.CLIENT_ERRORS = (MockClientError,)
+ TEST_CLASS = TestActor
+
+
+ def build_monitor(self, side_effect, *args, **kwargs):
+ super(RemotePollLoopActorTestCase, self).build_monitor(*args, **kwargs)
+ self.client.side_effect = side_effect
+
+ def test_poll_loop_starts_after_subscription(self):
+ self.build_monitor(['test1'])
+ self.monitor.subscribe(self.subscriber)
+ self.wait_for_call(self.subscriber)
+ self.subscriber.assert_called_with('test1')
+ self.wait_for_call(self.timer.schedule)
+
+ def test_poll_loop_continues_after_failure(self):
+ self.build_monitor(self.MockClientError)
+ self.monitor.subscribe(self.subscriber)
+ self.wait_for_call(self.timer.schedule)
+ self.assertTrue(self.monitor.actor_ref.is_alive(),
+ "poll loop died after error")
+ self.assertFalse(self.subscriber.called,
+ "poll loop notified subscribers after error")
+
+ def test_late_subscribers_get_responses(self):
+ self.build_monitor(['late_test'])
+ self.monitor.subscribe(lambda response: None)
+ self.monitor.subscribe(self.subscriber)
+ self.monitor.poll()
+ self.wait_for_call(self.subscriber)
+ self.subscriber.assert_called_with('late_test')
+
+ def test_survive_dead_subscriptions(self):
+ self.build_monitor(['survive1', 'survive2'])
+ dead_subscriber = mock.Mock(name='dead_subscriber')
+ dead_subscriber.side_effect = pykka.ActorDeadError
+ self.monitor.subscribe(dead_subscriber)
+ self.wait_for_call(dead_subscriber)
+ self.monitor.subscribe(self.subscriber)
+ self.monitor.poll()
+ self.wait_for_call(self.subscriber)
+ self.subscriber.assert_called_with('survive2')
+ self.assertTrue(self.monitor.actor_ref.is_alive(),
+ "poll loop died from dead subscriber")
+
+ def test_no_subscriptions_by_key_without_support(self):
+ self.build_monitor([])
+ with self.assertRaises(AttributeError):
+ self.monitor.subscribe_to('key')
+
+
+class RemotePollLoopActorWithKeysTestCase(testutil.RemotePollLoopActorTestMixin,
+ unittest.TestCase):
+ class TestActor(RemotePollLoopActorTestCase.TestActor):
+ def _item_key(self, item):
+ return item['key']
+ TEST_CLASS = TestActor
+
+
+ def build_monitor(self, side_effect, *args, **kwargs):
+ super(RemotePollLoopActorWithKeysTestCase, self).build_monitor(
+ *args, **kwargs)
+ self.client.side_effect = side_effect
+
+ def test_key_subscription(self):
+ self.build_monitor([[{'key': 1}, {'key': 2}]])
+ self.monitor.subscribe_to(2, self.subscriber)
+ self.wait_for_call(self.subscriber)
+ self.subscriber.assert_called_with({'key': 2})
+
+ def test_survive_dead_key_subscriptions(self):
+ item = {'key': 3}
+ self.build_monitor([[item], [item]])
+ dead_subscriber = mock.Mock(name='dead_subscriber')
+ dead_subscriber.side_effect = pykka.ActorDeadError
+ self.monitor.subscribe_to(3, dead_subscriber)
+ self.wait_for_call(dead_subscriber)
+ self.monitor.subscribe_to(3, self.subscriber)
+ self.monitor.poll()
+ self.wait_for_call(self.subscriber)
+ self.subscriber.assert_called_with(item)
+ self.assertTrue(self.monitor.actor_ref.is_alive(),
+ "poll loop died from dead key subscriber")
+
+ def test_mixed_subscriptions(self):
+ item = {'key': 4}
+ self.build_monitor([[item], [item]])
+ key_subscriber = mock.Mock(name='key_subscriber')
+ self.monitor.subscribe(self.subscriber)
+ self.monitor.subscribe_to(4, key_subscriber)
+ self.monitor.poll()
+ self.wait_for_call(self.subscriber)
+ self.subscriber.assert_called_with([item])
+ key_subscriber.assert_called_with(item)
+
+ def test_subscription_to_missing_key(self):
+ self.build_monitor([[]])
+ self.monitor.subscribe_to('nonesuch', self.subscriber)
+ self.wait_for_call(self.subscriber)
+ self.subscriber.assert_called_with(None)
+
+
+if __name__ == '__main__':
+ unittest.main()
+
diff --git a/services/nodemanager/tests/test_computenode.py b/services/nodemanager/tests/test_computenode.py
new file mode 100644
index 0000000..e628b13
--- /dev/null
+++ b/services/nodemanager/tests/test_computenode.py
@@ -0,0 +1,255 @@
+#!/usr/bin/env python
+
+import time
+import unittest
+
+import arvados.errors as arverror
+import httplib2
+import mock
+import pykka
+
+import arvnodeman.computenode as cnode
+from . import testutil
+
+class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
+ def make_mocks(self, arvados_effect=None, cloud_effect=None):
+ if arvados_effect is None:
+ arvados_effect = testutil.arvados_node_mock()
+ self.timer = testutil.MockTimer()
+ self.api_client = mock.MagicMock(name='api_client')
+ self.api_client.nodes().create().execute.side_effect = arvados_effect
+ self.api_client.nodes().update().execute.side_effect = arvados_effect
+ self.cloud_client = mock.MagicMock(name='cloud_client')
+ self.cloud_client.create_node.return_value = testutil.cloud_node_mock(1)
+
+ def make_actor(self, arv_node=None):
+ if not hasattr(self, 'timer'):
+ self.make_mocks()
+ self.setup_actor = cnode.ComputeNodeSetupActor.start(
+ self.timer, self.api_client, self.cloud_client,
+ testutil.MockSize(1), arv_node).proxy()
+
+ def test_creation_without_arvados_node(self):
+ self.make_actor()
+ self.wait_for_call(self.api_client.nodes().create().execute)
+ self.wait_for_call(self.cloud_client.create_node)
+ self.wait_for_call(self.cloud_client.post_create_node)
+
+ def test_creation_with_arvados_node(self):
+ arv_node = testutil.arvados_node_mock()
+ self.make_mocks([arv_node])
+ self.make_actor(arv_node)
+ self.wait_for_call(self.api_client.nodes().update().execute)
+ self.wait_for_call(self.cloud_client.create_node)
+ self.wait_for_call(self.cloud_client.post_create_node)
+ self.cloud_client.post_create_node.assert_called_with(
+ self.cloud_client.create_node(), arv_node)
+
+ def test_failed_calls_retried(self):
+ self.make_mocks([
+ arverror.ApiError(httplib2.Response({'status': '500'}), ""),
+ testutil.arvados_node_mock(),
+ ])
+ self.make_actor()
+ self.wait_for_call(self.cloud_client.create_node)
+
+ def test_stop_when_no_cloud_node(self):
+ self.make_mocks(
+ arverror.ApiError(httplib2.Response({'status': '500'}), ""))
+ self.make_actor()
+ self.wait_for_call(self.api_client.nodes().create().execute)
+ self.setup_actor.stop_if_no_cloud_node()
+ self.assertTrue(
+ self.setup_actor.actor_ref.actor_stopped.wait(self.TIMEOUT))
+
+ def test_no_stop_when_cloud_node(self):
+ self.make_actor()
+ self.wait_for_call(self.cloud_client.create_node)
+ self.setup_actor.stop_if_no_cloud_node().get(self.TIMEOUT)
+ self.assertFalse(self.setup_actor.actor_ref.actor_stopped.is_set())
+
+ def test_subscribe(self):
+ self.make_mocks(
+ arverror.ApiError(httplib2.Response({'status': '500'}), ""))
+ self.make_actor()
+ subscriber = mock.Mock(name='subscriber_mock')
+ self.setup_actor.subscribe(subscriber)
+ self.api_client.nodes().create().execute.side_effect = [
+ testutil.arvados_node_mock()]
+ self.wait_for_call(subscriber)
+ self.assertEqual(self.setup_actor.actor_ref.actor_urn,
+ subscriber.call_args[0][0].actor_ref.actor_urn)
+
+ def test_late_subscribe(self):
+ self.make_actor()
+ subscriber = mock.Mock(name='subscriber_mock')
+ self.wait_for_call(self.cloud_client.create_node)
+ self.setup_actor.subscribe(subscriber)
+ self.wait_for_call(subscriber)
+ self.assertEqual(self.setup_actor.actor_ref.actor_urn,
+ subscriber.call_args[0][0].actor_ref.actor_urn)
+
+
+class ComputeNodeShutdownActorTestCase(testutil.ActorTestMixin,
+ unittest.TestCase):
+ def make_mocks(self, cloud_node=None):
+ self.timer = testutil.MockTimer()
+ self.cloud_client = mock.MagicMock(name='cloud_client')
+ if cloud_node is None:
+ cloud_node = testutil.cloud_node_mock()
+ self.cloud_node = cloud_node
+
+ def make_actor(self, arv_node=None):
+ if not hasattr(self, 'timer'):
+ self.make_mocks()
+ self.shutdown_actor = cnode.ComputeNodeShutdownActor.start(
+ self.timer, self.cloud_client, self.cloud_node).proxy()
+
+ def test_easy_shutdown(self):
+ self.make_actor()
+ self.wait_for_call(self.cloud_client.destroy_node)
+
+
+ at mock.patch('time.time', return_value=1)
+class ShutdownTimerTestCase(unittest.TestCase):
+ def test_two_length_window(self, time_mock):
+ timer = cnode.ShutdownTimer(time_mock.return_value, [8, 2])
+ self.assertEqual(481, timer.next_opening())
+ self.assertFalse(timer.window_open())
+ time_mock.return_value += 500
+ self.assertEqual(1081, timer.next_opening())
+ self.assertTrue(timer.window_open())
+ time_mock.return_value += 200
+ self.assertEqual(1081, timer.next_opening())
+ self.assertFalse(timer.window_open())
+
+ def test_three_length_window(self, time_mock):
+ timer = cnode.ShutdownTimer(time_mock.return_value, [6, 3, 1])
+ self.assertEqual(361, timer.next_opening())
+ self.assertFalse(timer.window_open())
+ time_mock.return_value += 400
+ self.assertEqual(961, timer.next_opening())
+ self.assertTrue(timer.window_open())
+ time_mock.return_value += 200
+ self.assertEqual(961, timer.next_opening())
+ self.assertFalse(timer.window_open())
+
+
+class ComputeNodeActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
+ class MockShutdownTimer(object):
+ def _set_state(self, is_open, next_opening):
+ self.window_open = lambda: is_open
+ self.next_opening = lambda: next_opening
+
+
+ def make_mocks(self, node_num):
+ self.shutdowns = self.MockShutdownTimer()
+ self.shutdowns._set_state(False, 300)
+ self.timer = mock.MagicMock(name='timer_mock')
+ self.cloud_mock = testutil.cloud_node_mock(node_num)
+ self.subscriber = mock.Mock(name='subscriber_mock')
+
+ def make_actor(self, node_num=1, arv_node=None, start_time=None):
+ if not hasattr(self, 'cloud_mock'):
+ self.make_mocks(node_num)
+ if start_time is None:
+ start_time = time.time()
+ start_time = time.time()
+ self.node_actor = cnode.ComputeNodeActor.start(
+ self.cloud_mock, start_time, self.shutdowns, self.timer,
+ arv_node).proxy()
+ self.node_actor.subscribe(self.subscriber)
+
+ def test_init_shutdown_scheduling(self):
+ self.make_actor()
+ self.wait_for_call(self.timer.schedule)
+ self.assertEqual(300, self.timer.schedule.call_args[0][0])
+
+ def test_shutdown_subscription(self):
+ self.make_actor()
+ self.shutdowns._set_state(True, 600)
+ self.node_actor.consider_shutdown()
+ self.wait_for_call(self.subscriber)
+ self.assertEqual(self.node_actor.actor_ref.actor_urn,
+ self.subscriber.call_args[0][0].actor_ref.actor_urn)
+
+ def test_shutdown_without_arvados_node(self):
+ self.make_actor()
+ self.shutdowns._set_state(True, 600)
+ self.node_actor.consider_shutdown()
+ self.wait_for_call(self.subscriber)
+
+ def test_no_shutdown_without_arvados_node_and_old_cloud_node(self):
+ self.make_actor(start_time=0)
+ self.shutdowns._set_state(True, 600)
+ self.node_actor.consider_shutdown()
+ self.assertFalse(self.subscriber.called)
+
+ def check_shutdown_rescheduled(self, window_open, next_window,
+ schedule_time=None):
+ self.shutdowns._set_state(window_open, next_window)
+ self.timer.schedule.reset_mock()
+ self.node_actor.consider_shutdown()
+ self.wait_for_call(self.timer.schedule)
+ if schedule_time is not None:
+ self.assertEqual(schedule_time, self.timer.schedule.call_args[0][0])
+ self.assertFalse(self.subscriber.called)
+
+ def test_shutdown_window_close_scheduling(self):
+ self.make_actor()
+ self.check_shutdown_rescheduled(False, 600, 600)
+
+ def test_no_shutdown_when_node_running_job(self):
+ self.make_actor(4, testutil.arvados_node_mock(4, job_uuid=True))
+ self.check_shutdown_rescheduled(True, 600)
+
+ def test_no_shutdown_when_node_state_unknown(self):
+ self.make_actor(5, testutil.arvados_node_mock(5, info={}))
+ self.check_shutdown_rescheduled(True, 600)
+
+ def test_no_shutdown_when_node_state_stale(self):
+ self.make_actor(6, testutil.arvados_node_mock(6, age=900))
+ self.check_shutdown_rescheduled(True, 600)
+
+ def test_arvados_node_match(self):
+ self.make_actor(2)
+ arv_node = testutil.arvados_node_mock(2)
+ pair_future = self.node_actor.offer_arvados_pair(arv_node)
+ self.assertEqual(self.cloud_mock.id, pair_future.get(self.TIMEOUT))
+
+ def test_arvados_node_mismatch(self):
+ self.make_actor(3)
+ arv_node = testutil.arvados_node_mock(1)
+ pair_future = self.node_actor.offer_arvados_pair(arv_node)
+ self.assertIsNone(pair_future.get(self.TIMEOUT))
+
+ def test_update_cloud_node(self):
+ self.make_actor(1)
+ self.make_mocks(2)
+ self.cloud_mock.id = '1'
+ self.node_actor.update_cloud_node(self.cloud_mock)
+ current_cloud = self.node_actor.cloud_node.get(self.TIMEOUT)
+ self.assertEqual([testutil.ip_address_mock(2)],
+ current_cloud.private_ips)
+
+ def test_missing_cloud_node_update(self):
+ self.make_actor(1)
+ self.node_actor.update_cloud_node(None)
+ current_cloud = self.node_actor.cloud_node.get(self.TIMEOUT)
+ self.assertEqual([testutil.ip_address_mock(1)],
+ current_cloud.private_ips)
+
+ def test_update_arvados_node(self):
+ self.make_actor(3)
+ job_uuid = 'zzzzz-jjjjj-updatejobnode00'
+ new_arvados = testutil.arvados_node_mock(3, job_uuid)
+ self.node_actor.update_arvados_node(new_arvados)
+ current_arvados = self.node_actor.arvados_node.get(self.TIMEOUT)
+ self.assertEqual(job_uuid, current_arvados['job_uuid'])
+
+ def test_missing_arvados_node_update(self):
+ self.make_actor(4, testutil.arvados_node_mock(4))
+ self.node_actor.update_arvados_node(None)
+ current_arvados = self.node_actor.arvados_node.get(self.TIMEOUT)
+ self.assertEqual(testutil.ip_address_mock(4),
+ current_arvados['ip_address'])
diff --git a/services/nodemanager/tests/test_computenode_ec2.py b/services/nodemanager/tests/test_computenode_ec2.py
new file mode 100644
index 0000000..20f1d1d
--- /dev/null
+++ b/services/nodemanager/tests/test_computenode_ec2.py
@@ -0,0 +1,70 @@
+#!/usr/bin/env python
+
+import time
+import unittest
+
+import mock
+
+import arvnodeman.computenode.ec2 as ec2
+from . import testutil
+
+class EC2ComputeNodeDriverTestCase(unittest.TestCase):
+ def setUp(self):
+ self.driver_mock = mock.MagicMock(name='driver_mock')
+
+ def new_driver(self, auth_kwargs={}, list_kwargs={}, create_kwargs={}):
+ create_kwargs.setdefault('ping_host', '100::')
+ return ec2.ComputeNodeDriver(
+ auth_kwargs, list_kwargs, create_kwargs,
+ driver_class=self.driver_mock)
+
+ def test_driver_instantiation(self):
+ kwargs = {'key': 'testkey'}
+ driver = self.new_driver(auth_kwargs=kwargs)
+ self.assertTrue(self.driver_mock.called)
+ self.assertEqual(kwargs, self.driver_mock.call_args[1])
+
+ def test_list_kwargs_become_filters(self):
+ # We're also testing tag name translation.
+ driver = self.new_driver(list_kwargs={'tag_test': 'true'})
+ driver.list_nodes()
+ list_method = self.driver_mock().list_nodes
+ self.assertTrue(list_method.called)
+ self.assertEqual({'tag:test': 'true'},
+ list_method.call_args[1].get('ex_filters'))
+
+ def test_create_location_loaded_at_initialization(self):
+ kwargs = {'location': 'testregion'}
+ driver = self.new_driver(create_kwargs=kwargs)
+ self.assertTrue(self.driver_mock().list_locations)
+
+ def test_create_image_loaded_at_initialization(self):
+ kwargs = {'image': 'testimage'}
+ driver = self.new_driver(create_kwargs=kwargs)
+ self.assertTrue(self.driver_mock().list_images)
+
+ def test_create_includes_ping_secret(self):
+ arv_node = testutil.arvados_node_mock(info={'ping_secret': 'ssshh'})
+ driver = self.new_driver()
+ driver.create_node(testutil.MockSize(1), arv_node)
+ create_method = self.driver_mock().create_node
+ self.assertTrue(create_method.called)
+ self.assertIn('ping_secret=ssshh',
+ create_method.call_args[1].get('ex_userdata',
+ 'arg missing'))
+
+ def test_post_create_tags_node(self):
+ arv_node = testutil.arvados_node_mock(8)
+ cloud_node = testutil.cloud_node_mock(8)
+ driver = self.new_driver(list_kwargs={'tag:list': 'test'})
+ self.assertEqual({'ex_metadata': {'list': 'test'},
+ 'name': 'compute8.zzzzz.arvadosapi.com'},
+ driver.arvados_create_kwargs(arv_node))
+
+ def test_node_create_time(self):
+ refsecs = int(time.time())
+ reftuple = time.gmtime(refsecs)
+ node = testutil.cloud_node_mock()
+ node.extra = {'launch_time': time.strftime('%Y-%m-%dT%H:%M:%S.000Z',
+ reftuple)}
+ self.assertEqual(refsecs, ec2.ComputeNodeDriver.node_start_time(node))
diff --git a/services/nodemanager/tests/test_config.py b/services/nodemanager/tests/test_config.py
new file mode 100644
index 0000000..f726e6e
--- /dev/null
+++ b/services/nodemanager/tests/test_config.py
@@ -0,0 +1,63 @@
+#!/usr/bin/env python
+
+import io
+import logging
+import unittest
+
+import arvnodeman.config as nmconfig
+
+class NodeManagerConfigTestCase(unittest.TestCase):
+ TEST_CONFIG = u"""
+[Cloud]
+provider = dummy
+shutdown_windows = 52, 6, 2
+
+[Cloud Credentials]
+creds = dummy_creds
+
+[Cloud List]
+[Cloud Create]
+
+[Size 1]
+cores = 1
+
+[Logging]
+file = /dev/null
+level = DEBUG
+testlogger = INFO
+"""
+
+ def load_config(self, config=None, config_str=None):
+ if config is None:
+ config = nmconfig.NodeManagerConfig()
+ if config_str is None:
+ config_str = self.TEST_CONFIG
+ with io.StringIO(config_str) as config_fp:
+ config.readfp(config_fp)
+ return config
+
+ def test_seeded_defaults(self):
+ config = nmconfig.NodeManagerConfig()
+ sec_names = set(config.sections())
+ self.assertIn('Arvados', sec_names)
+ self.assertIn('Daemon', sec_names)
+ self.assertFalse(any(name.startswith('Size ') for name in sec_names))
+
+ def test_list_sizes(self):
+ config = self.load_config()
+ client = config.new_cloud_client()
+ sizes = config.node_sizes(client.list_sizes())
+ self.assertEqual(1, len(sizes))
+ size, kwargs = sizes[0]
+ self.assertEqual('Small', size.name)
+ self.assertEqual(1, kwargs['cores'])
+
+ def test_shutdown_windows(self):
+ config = self.load_config()
+ self.assertEqual([52, 6, 2], config.shutdown_windows())
+
+ def test_log_levels(self):
+ config = self.load_config()
+ self.assertEqual({'level': logging.DEBUG,
+ 'testlogger': logging.INFO},
+ config.log_levels())
diff --git a/services/nodemanager/tests/test_daemon.py b/services/nodemanager/tests/test_daemon.py
new file mode 100644
index 0000000..28be5fa
--- /dev/null
+++ b/services/nodemanager/tests/test_daemon.py
@@ -0,0 +1,138 @@
+#!/usr/bin/env python
+
+import time
+import unittest
+
+import mock
+
+import arvnodeman.daemon as nmdaemon
+from . import testutil
+
+class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
+ unittest.TestCase):
+ def make_daemon(self, cloud_nodes=[], arvados_nodes=[], want_sizes=[]):
+ for name in ['cloud_nodes', 'arvados_nodes', 'server_wishlist']:
+ setattr(self, name + '_poller', mock.MagicMock(name=name + '_mock'))
+ self.arv_factory = mock.MagicMock(name='arvados_mock')
+ self.cloud_factory = mock.MagicMock(name='cloud_mock')
+ self.cloud_factory().node_start_time.return_value = time.time()
+ self.timer = testutil.MockTimer()
+ self.node_factory = mock.MagicMock(name='factory_mock')
+ self.node_setup = mock.MagicMock(name='setup_mock')
+ self.node_shutdown = mock.MagicMock(name='shutdown_mock')
+ self.daemon = nmdaemon.NodeManagerDaemonActor.start(
+ self.server_wishlist_poller, self.arvados_nodes_poller,
+ self.cloud_nodes_poller, self.timer,
+ self.arv_factory, self.cloud_factory,
+ [54, 5, 1], 8, 600, 3600,
+ self.node_setup, self.node_shutdown, self.node_factory).proxy()
+ if cloud_nodes is not None:
+ self.daemon.update_cloud_nodes(cloud_nodes)
+ if arvados_nodes is not None:
+ self.daemon.update_arvados_nodes(arvados_nodes)
+ if want_sizes is not None:
+ self.daemon.update_server_wishlist(want_sizes)
+
+ def test_easy_node_creation(self):
+ size = testutil.MockSize(1)
+ self.make_daemon(want_sizes=[size])
+ self.wait_for_call(self.node_setup.start)
+
+ def test_node_pairing(self):
+ cloud_node = testutil.cloud_node_mock(1)
+ arv_node = testutil.arvados_node_mock(1)
+ self.make_daemon([cloud_node], [arv_node])
+ self.wait_for_call(self.node_factory.start)
+ pair_func = self.node_factory.start().proxy().offer_arvados_pair
+ self.wait_for_call(pair_func)
+ pair_func.assert_called_with(arv_node)
+
+ def test_node_pairing_after_arvados_update(self):
+ cloud_node = testutil.cloud_node_mock(2)
+ arv_node = testutil.arvados_node_mock(2, ip_address=None)
+ self.make_daemon([cloud_node], None)
+ pair_func = self.node_factory.start().proxy().offer_arvados_pair
+ pair_func().get.return_value = None
+ self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
+ pair_func.assert_called_with(arv_node)
+
+ pair_func().get.return_value = cloud_node.id
+ pair_func.reset_mock()
+ arv_node = testutil.arvados_node_mock(2)
+ self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
+ pair_func.assert_called_with(arv_node)
+
+ def test_old_arvados_node_not_double_assigned(self):
+ arv_node = testutil.arvados_node_mock(3, age=9000)
+ size = testutil.MockSize(3)
+ self.make_daemon(arvados_nodes=[arv_node], want_sizes=[size, size])
+ node_starter = self.node_setup.start
+ deadline = time.time() + self.TIMEOUT
+ while (time.time() < deadline) and (node_starter.call_count < 2):
+ time.sleep(.1)
+ self.assertEqual(2, node_starter.call_count)
+ used_nodes = [call[1].get('arvados_node')
+ for call in node_starter.call_args_list]
+ self.assertIn(arv_node, used_nodes)
+ self.assertIn(None, used_nodes)
+
+ def test_node_count_satisfied(self):
+ self.make_daemon([testutil.cloud_node_mock()])
+ self.daemon.update_server_wishlist(
+ [testutil.MockSize(1)]).get(self.TIMEOUT)
+ self.assertFalse(self.node_setup.called)
+
+ def test_booting_nodes_counted(self):
+ cloud_node = testutil.cloud_node_mock(1)
+ arv_node = testutil.arvados_node_mock(1)
+ server_wishlist = [testutil.MockSize(1)] * 2
+ self.make_daemon([cloud_node], [arv_node], server_wishlist)
+ self.wait_for_call(self.node_setup.start)
+ self.node_setup.reset_mock()
+ self.daemon.update_server_wishlist(server_wishlist).get(self.TIMEOUT)
+ self.assertFalse(self.node_setup.called)
+
+ def test_booting_nodes_shut_down(self):
+ self.make_daemon(want_sizes=[testutil.MockSize(1)])
+ self.wait_for_call(self.node_setup.start)
+ self.daemon.update_server_wishlist([])
+ self.wait_for_call(
+ self.node_setup.start().proxy().stop_if_no_cloud_node)
+
+ def test_shutdown_declined_at_wishlist_capacity(self):
+ cloud_node = testutil.cloud_node_mock(1)
+ size = testutil.MockSize(1)
+ self.make_daemon(cloud_nodes=[cloud_node], want_sizes=[size])
+ node_actor = self.node_factory().proxy()
+ self.daemon.shutdown_offer(node_actor).get(self.TIMEOUT)
+ self.assertFalse(node_actor.shutdown.called)
+
+ def test_shutdown_accepted_below_capacity(self):
+ self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
+ node_actor = self.node_factory().proxy()
+ self.daemon.shutdown_offer(node_actor)
+ self.wait_for_call(self.node_shutdown.start)
+
+ def test_clean_shutdown_waits_for_node_setup_finish(self):
+ self.make_daemon(want_sizes=[testutil.MockSize(1)])
+ self.wait_for_call(self.node_setup.start)
+ new_node = self.node_setup.start().proxy()
+ self.daemon.shutdown()
+ self.wait_for_call(new_node.stop_if_no_cloud_node)
+ self.daemon.node_up(new_node)
+ self.wait_for_call(new_node.stop)
+ self.assertTrue(
+ self.daemon.actor_ref.actor_stopped.wait(self.TIMEOUT))
+
+ def test_wishlist_ignored_after_shutdown(self):
+ size = testutil.MockSize(2)
+ self.make_daemon(want_sizes=[size])
+ node_starter = self.node_setup.start
+ self.wait_for_call(node_starter)
+ node_starter.reset_mock()
+ self.daemon.shutdown()
+ self.daemon.update_server_wishlist([size] * 2).get(self.TIMEOUT)
+ # Send another message and wait for a response, to make sure all
+ # internal messages generated by the wishlist update are processed.
+ self.daemon.update_server_wishlist([size] * 2).get(self.TIMEOUT)
+ self.assertFalse(node_starter.called)
diff --git a/services/nodemanager/tests/test_jobqueue.py b/services/nodemanager/tests/test_jobqueue.py
new file mode 100644
index 0000000..0ff9ebb
--- /dev/null
+++ b/services/nodemanager/tests/test_jobqueue.py
@@ -0,0 +1,72 @@
+#!/usr/bin/env python
+
+import unittest
+
+import arvnodeman.jobqueue as jobqueue
+from . import testutil
+
+class ServerCalculatorTestCase(unittest.TestCase):
+ def make_calculator(self, factors, **kwargs):
+ return jobqueue.ServerCalculator(
+ [(testutil.MockSize(n), {'cores': n}) for n in factors], **kwargs)
+
+ def calculate(self, servcalc, *constraints):
+ return servcalc.servers_for_queue(
+ [{'runtime_constraints': cdict} for cdict in constraints])
+
+ def test_empty_queue_needs_no_servers(self):
+ servcalc = self.make_calculator([1])
+ self.assertEqual([], servcalc.servers_for_queue([]))
+
+ def test_easy_server_count(self):
+ servcalc = self.make_calculator([1])
+ servlist = self.calculate(servcalc, {'min_nodes': 3})
+ self.assertEqual(3, len(servlist))
+
+ def test_implicit_server_count(self):
+ servcalc = self.make_calculator([1])
+ servlist = self.calculate(servcalc, {}, {'min_nodes': 3})
+ self.assertEqual(4, len(servlist))
+
+ def test_bad_min_nodes_override(self):
+ servcalc = self.make_calculator([1])
+ servlist = self.calculate(servcalc,
+ {'min_nodes': -2}, {'min_nodes': 'foo'})
+ self.assertEqual(2, len(servlist))
+
+ def test_ignore_unsatisfiable_jobs(self):
+ servcalc = self.make_calculator([1], max_nodes=9)
+ servlist = self.calculate(servcalc,
+ {'min_cores_per_node': 2},
+ {'min_ram_mb_per_node': 256},
+ {'min_nodes': 6},
+ {'min_nodes': 12},
+ {'min_scratch_mb_per_node': 200})
+ self.assertEqual(6, len(servlist))
+
+
+class JobQueueMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
+ unittest.TestCase):
+ TEST_CLASS = jobqueue.JobQueueMonitorActor
+
+ class MockCalculator(object):
+ @staticmethod
+ def servers_for_queue(queue):
+ return [testutil.MockSize(n) for n in queue]
+
+
+ def build_monitor(self, side_effect, *args, **kwargs):
+ super(JobQueueMonitorActorTestCase, self).build_monitor(*args, **kwargs)
+ self.client.jobs().queue().execute.side_effect = side_effect
+
+ def test_subscribers_get_server_lists(self):
+ self.build_monitor([{'items': [1, 2]}], self.MockCalculator())
+ self.monitor.subscribe(self.subscriber)
+ self.wait_for_call(self.subscriber)
+ self.subscriber.assert_called_with([testutil.MockSize(1),
+ testutil.MockSize(2)])
+
+
+if __name__ == '__main__':
+ unittest.main()
+
diff --git a/services/nodemanager/tests/test_nodelist.py b/services/nodemanager/tests/test_nodelist.py
new file mode 100644
index 0000000..f4a3321
--- /dev/null
+++ b/services/nodemanager/tests/test_nodelist.py
@@ -0,0 +1,54 @@
+#!/usr/bin/env python
+
+import unittest
+
+import arvnodeman.nodelist as nodelist
+from . import testutil
+
+class ArvadosNodeListMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
+ unittest.TestCase):
+ TEST_CLASS = nodelist.ArvadosNodeListMonitorActor
+
+ def build_monitor(self, side_effect, *args, **kwargs):
+ super(ArvadosNodeListMonitorActorTestCase, self).build_monitor(
+ *args, **kwargs)
+ self.client.nodes().list().execute.side_effect = side_effect
+
+ def test_uuid_is_subscription_key(self):
+ node = testutil.arvados_node_mock()
+ self.build_monitor([{'items': [node]}])
+ self.monitor.subscribe_to(node['uuid'], self.subscriber)
+ self.wait_for_call(self.subscriber)
+ self.subscriber.assert_called_with(node)
+
+
+class CloudNodeListMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
+ unittest.TestCase):
+ TEST_CLASS = nodelist.CloudNodeListMonitorActor
+
+ class MockNode(object):
+ def __init__(self, count):
+ self.id = str(count)
+ self.name = 'test{}.example.com'.format(count)
+ self.private_ips = ['10.0.0.{}'.format(count)]
+ self.public_ips = []
+ self.size = None
+ self.state = 0
+
+
+ def build_monitor(self, side_effect, *args, **kwargs):
+ super(CloudNodeListMonitorActorTestCase, self).build_monitor(
+ *args, **kwargs)
+ self.client.list_nodes.side_effect = side_effect
+
+ def test_id_is_subscription_key(self):
+ node = self.MockNode(1)
+ self.build_monitor([[node]])
+ self.monitor.subscribe_to('1', self.subscriber)
+ self.wait_for_call(self.subscriber)
+ self.subscriber.assert_called_with(node)
+
+
+if __name__ == '__main__':
+ unittest.main()
+
diff --git a/services/nodemanager/tests/test_timedcallback.py b/services/nodemanager/tests/test_timedcallback.py
new file mode 100644
index 0000000..7c4c43b
--- /dev/null
+++ b/services/nodemanager/tests/test_timedcallback.py
@@ -0,0 +1,53 @@
+#!/usr/bin/env python
+
+import time
+import unittest
+
+import mock
+import pykka
+
+import arvnodeman.timedcallback as timedcallback
+from . import testutil
+
+ at testutil.no_sleep
+class TimedCallBackActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
+ def test_immediate_turnaround(self):
+ future = self.FUTURE_CLASS()
+ deliverer = timedcallback.TimedCallBackActor.start().proxy()
+ deliverer.schedule(time.time() - 1, future.set, 'immediate')
+ self.assertEqual('immediate', future.get(self.TIMEOUT))
+
+ def test_delayed_turnaround(self):
+ future = self.FUTURE_CLASS()
+ with mock.patch('time.time', return_value=0) as mock_now:
+ deliverer = timedcallback.TimedCallBackActor.start().proxy()
+ deliverer.schedule(1, future.set, 'delayed')
+ self.assertRaises(pykka.Timeout, future.get, .5)
+ mock_now.return_value = 2
+ self.assertEqual('delayed', future.get(self.TIMEOUT))
+
+ def test_out_of_order_scheduling(self):
+ future1 = self.FUTURE_CLASS()
+ future2 = self.FUTURE_CLASS()
+ with mock.patch('time.time', return_value=1.5) as mock_now:
+ deliverer = timedcallback.TimedCallBackActor.start().proxy()
+ deliverer.schedule(2, future2.set, 'second')
+ deliverer.schedule(1, future1.set, 'first')
+ self.assertEqual('first', future1.get(self.TIMEOUT))
+ self.assertRaises(pykka.Timeout, future2.get, .1)
+ mock_now.return_value = 3
+ self.assertEqual('second', future2.get(self.TIMEOUT))
+
+ def test_dead_actors_ignored(self):
+ receiver = mock.Mock(name='dead_actor', spec=pykka.ActorRef)
+ receiver.tell.side_effect = pykka.ActorDeadError
+ deliverer = timedcallback.TimedCallBackActor.start().proxy()
+ deliverer.schedule(time.time() - 1, receiver.tell, 'error')
+ self.wait_for_call(receiver.tell)
+ receiver.tell.assert_called_with('error')
+ self.assertTrue(deliverer.actor_ref.is_alive(), "deliverer died")
+
+
+if __name__ == '__main__':
+ unittest.main()
+
diff --git a/services/nodemanager/tests/testutil.py b/services/nodemanager/tests/testutil.py
new file mode 100644
index 0000000..d67cf1c
--- /dev/null
+++ b/services/nodemanager/tests/testutil.py
@@ -0,0 +1,79 @@
+#!/usr/bin/env python
+
+import time
+
+import mock
+import pykka
+
+no_sleep = mock.patch('time.sleep', lambda n: None)
+
+def arvados_node_mock(node_num=99, job_uuid=None, age=0, **kwargs):
+ if job_uuid is True:
+ job_uuid = 'zzzzz-jjjjj-jobjobjobjobjob'
+ slurm_state = 'idle' if (job_uuid is None) else 'alloc'
+ node = {'uuid': 'zzzzz-yyyyy-12345abcde67890',
+ 'created_at': '2014-01-01T01:02:03Z',
+ 'modified_at': time.strftime('%Y-%m-%dT%H:%M:%SZ',
+ time.gmtime(time.time() - age)),
+ 'hostname': 'compute{}'.format(node_num),
+ 'domain': 'zzzzz.arvadosapi.com',
+ 'ip_address': ip_address_mock(node_num),
+ 'job_uuid': job_uuid,
+ 'info': {'slurm_state': slurm_state}}
+ node.update(kwargs)
+ return node
+
+def cloud_node_mock(node_num=99):
+ node = mock.NonCallableMagicMock(
+ ['id', 'name', 'state', 'public_ips', 'private_ips', 'driver', 'size',
+ 'image', 'extra'],
+ name='cloud_node')
+ node.id = str(node_num)
+ node.name = node.id
+ node.public_ips = []
+ node.private_ips = [ip_address_mock(node_num)]
+ return node
+
+def ip_address_mock(last_octet):
+ return '10.20.30.{}'.format(last_octet)
+
+class MockSize(object):
+ def __init__(self, factor):
+ self.id = 'z{}.test'.format(factor)
+ self.name = self.id
+ self.ram = 128 * factor
+ self.disk = 100 * factor
+ self.bandwidth = 16 * factor
+ self.price = float(factor)
+ self.extra = {}
+
+ def __eq__(self, other):
+ return self.id == other.id
+
+
+class MockTimer(object):
+ def schedule(self, want_time, callback, *args, **kwargs):
+ return callback(*args, **kwargs)
+
+
+class ActorTestMixin(object):
+ FUTURE_CLASS = pykka.ThreadingFuture
+ TIMEOUT = 3
+
+ def tearDown(self):
+ pykka.ActorRegistry.stop_all()
+
+ def wait_for_call(self, mock_func, timeout=TIMEOUT):
+ deadline = time.time() + timeout
+ while (not mock_func.called) and (time.time() < deadline):
+ time.sleep(.1)
+ self.assertTrue(mock_func.called, "{} not called".format(mock_func))
+
+
+class RemotePollLoopActorTestMixin(ActorTestMixin):
+ def build_monitor(self, *args, **kwargs):
+ self.timer = mock.MagicMock(name='timer_mock')
+ self.client = mock.MagicMock(name='client_mock')
+ self.subscriber = mock.Mock(name='subscriber_mock')
+ self.monitor = self.TEST_CLASS.start(
+ self.client, self.timer, *args, **kwargs).proxy()
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list