[ARVADOS] updated: 44e01cf266a3c062b2f0f5bb3426672024367d38
git at public.curoverse.com
git at public.curoverse.com
Wed Oct 8 16:52:34 EDT 2014
Summary of changes:
services/nodemanager/.gitignore | 4 +
services/nodemanager/arvnodeman/__init__.py | 9 +
services/nodemanager/arvnodeman/clientactor.py | 96 ++++++
.../nodemanager/arvnodeman/computenode/__init__.py | 383 +++++++++++++++++++++
.../nodemanager/arvnodeman/computenode/dummy.py | 52 +++
services/nodemanager/arvnodeman/computenode/ec2.py | 101 ++++++
services/nodemanager/arvnodeman/config.py | 108 ++++++
services/nodemanager/arvnodeman/daemon.py | 294 ++++++++++++++++
services/nodemanager/arvnodeman/jobqueue.py | 96 ++++++
services/nodemanager/arvnodeman/launcher.py | 130 +++++++
services/nodemanager/arvnodeman/nodelist.py | 39 +++
services/nodemanager/arvnodeman/timedcallback.py | 41 +++
services/nodemanager/bin/arvados-node-manager | 6 +
services/nodemanager/doc/ec2.example.cfg | 121 +++++++
services/nodemanager/doc/local.example.cfg | 41 +++
services/{fuse => nodemanager}/setup.py | 26 +-
services/{fuse => nodemanager}/tests/__init__.py | 0
services/nodemanager/tests/test_clientactor.py | 127 +++++++
services/nodemanager/tests/test_computenode.py | 272 +++++++++++++++
services/nodemanager/tests/test_computenode_ec2.py | 89 +++++
services/nodemanager/tests/test_config.py | 65 ++++
services/nodemanager/tests/test_daemon.py | 158 +++++++++
services/nodemanager/tests/test_jobqueue.py | 74 ++++
services/nodemanager/tests/test_nodelist.py | 56 +++
services/nodemanager/tests/test_timedcallback.py | 55 +++
services/nodemanager/tests/testutil.py | 81 +++++
26 files changed, 2508 insertions(+), 16 deletions(-)
create mode 100644 services/nodemanager/.gitignore
create mode 100644 services/nodemanager/arvnodeman/__init__.py
create mode 100644 services/nodemanager/arvnodeman/clientactor.py
create mode 100644 services/nodemanager/arvnodeman/computenode/__init__.py
create mode 100644 services/nodemanager/arvnodeman/computenode/dummy.py
create mode 100644 services/nodemanager/arvnodeman/computenode/ec2.py
create mode 100644 services/nodemanager/arvnodeman/config.py
create mode 100644 services/nodemanager/arvnodeman/daemon.py
create mode 100644 services/nodemanager/arvnodeman/jobqueue.py
create mode 100644 services/nodemanager/arvnodeman/launcher.py
create mode 100644 services/nodemanager/arvnodeman/nodelist.py
create mode 100644 services/nodemanager/arvnodeman/timedcallback.py
create mode 100644 services/nodemanager/bin/arvados-node-manager
create mode 100644 services/nodemanager/doc/ec2.example.cfg
create mode 100644 services/nodemanager/doc/local.example.cfg
copy services/{fuse => nodemanager}/setup.py (63%)
copy services/{fuse => nodemanager}/tests/__init__.py (100%)
create mode 100644 services/nodemanager/tests/test_clientactor.py
create mode 100644 services/nodemanager/tests/test_computenode.py
create mode 100644 services/nodemanager/tests/test_computenode_ec2.py
create mode 100644 services/nodemanager/tests/test_config.py
create mode 100644 services/nodemanager/tests/test_daemon.py
create mode 100644 services/nodemanager/tests/test_jobqueue.py
create mode 100644 services/nodemanager/tests/test_nodelist.py
create mode 100644 services/nodemanager/tests/test_timedcallback.py
create mode 100644 services/nodemanager/tests/testutil.py
via 44e01cf266a3c062b2f0f5bb3426672024367d38 (commit)
via a5687a390262abebfc16cf21e62052ac0019512d (commit)
from 4be23b41ec561b404dd833bdbea9d764f2b5d027 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 44e01cf266a3c062b2f0f5bb3426672024367d38
Merge: 4be23b4 a5687a3
Author: Brett Smith <brett at curoverse.com>
Date: Wed Oct 8 16:52:44 2014 -0400
Merge branch '2881-node-manager'
Closes #2881, #4106.
commit a5687a390262abebfc16cf21e62052ac0019512d
Author: Brett Smith <brett at curoverse.com>
Date: Fri Oct 3 17:53:57 2014 -0400
2881: Add Node Manager service.
diff --git a/services/nodemanager/.gitignore b/services/nodemanager/.gitignore
new file mode 100644
index 0000000..488ddd5
--- /dev/null
+++ b/services/nodemanager/.gitignore
@@ -0,0 +1,4 @@
+*.pyc
+*.egg-info
+build/
+dist/
diff --git a/services/nodemanager/arvnodeman/__init__.py b/services/nodemanager/arvnodeman/__init__.py
new file mode 100644
index 0000000..a1ecac7
--- /dev/null
+++ b/services/nodemanager/arvnodeman/__init__.py
@@ -0,0 +1,9 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+import _strptime # See <http://bugs.python.org/issue7980#msg221094>.
+import logging
+
+logger = logging.getLogger('arvnodeman')
+logger.addHandler(logging.NullHandler())
diff --git a/services/nodemanager/arvnodeman/clientactor.py b/services/nodemanager/arvnodeman/clientactor.py
new file mode 100644
index 0000000..77d85d6
--- /dev/null
+++ b/services/nodemanager/arvnodeman/clientactor.py
@@ -0,0 +1,96 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+import logging
+import time
+
+import pykka
+
+from .config import actor_class
+
+def _notify_subscribers(response, subscribers):
+ """Send the response to all the subscriber methods.
+
+ If any of the subscriber actors have stopped, remove them from the
+ subscriber set.
+ """
+ dead_subscribers = set()
+ for subscriber in subscribers:
+ try:
+ subscriber(response)
+ except pykka.ActorDeadError:
+ dead_subscribers.add(subscriber)
+ subscribers.difference_update(dead_subscribers)
+
+class RemotePollLoopActor(actor_class):
+ """Abstract actor class to regularly poll a remote service.
+
+ This actor sends regular requests to a remote service, and sends each
+ response to subscribers. It takes care of error handling, and retrying
+ requests with exponential backoff.
+
+ To use this actor, define CLIENT_ERRORS and the _send_request method.
+ If you also define an _item_key method, this class will support
+ subscribing to a specific item by key in responses.
+ """
+ def __init__(self, client, timer_actor, poll_wait=60, max_poll_wait=180):
+ super(RemotePollLoopActor, self).__init__()
+ self._client = client
+ self._timer = timer_actor
+ self._logger = logging.getLogger(self.LOGGER_NAME)
+ self._later = self.actor_ref.proxy()
+ self.min_poll_wait = poll_wait
+ self.max_poll_wait = max_poll_wait
+ self.poll_wait = self.min_poll_wait
+ self.last_poll_time = None
+ self.all_subscribers = set()
+ self.key_subscribers = {}
+ if hasattr(self, '_item_key'):
+ self.subscribe_to = self._subscribe_to
+
+ def _start_polling(self):
+ if self.last_poll_time is None:
+ self.last_poll_time = time.time()
+ self._later.poll()
+
+ def subscribe(self, subscriber):
+ self.all_subscribers.add(subscriber)
+ self._logger.debug("%r subscribed to all events", subscriber)
+ self._start_polling()
+
+ # __init__ exposes this method to the proxy if the subclass defines
+ # _item_key.
+ def _subscribe_to(self, key, subscriber):
+ self.key_subscribers.setdefault(key, set()).add(subscriber)
+ self._logger.debug("%r subscribed to events for '%s'", subscriber, key)
+ self._start_polling()
+
+ def _send_request(self):
+ raise NotImplementedError("subclasses must implement request method")
+
+ def _got_response(self, response):
+ self.poll_wait = self.min_poll_wait
+ _notify_subscribers(response, self.all_subscribers)
+ if hasattr(self, '_item_key'):
+ items = {self._item_key(x): x for x in response}
+ for key, subscribers in self.key_subscribers.iteritems():
+ _notify_subscribers(items.get(key), subscribers)
+
+ def _got_error(self, error):
+ self.poll_wait = min(self.poll_wait * 2, self.max_poll_wait)
+ self._logger.warning("Client error: %s - waiting %s seconds",
+ error, self.poll_wait)
+
+ def poll(self):
+ start_time = time.time()
+ try:
+ response = self._send_request()
+ except self.CLIENT_ERRORS as error:
+ self.last_poll_time = start_time
+ self._got_error(error)
+ else:
+ self.last_poll_time += self.poll_wait
+ self._got_response(response)
+ self._timer.schedule(self.last_poll_time + self.poll_wait,
+ self._later.poll)
diff --git a/services/nodemanager/arvnodeman/computenode/__init__.py b/services/nodemanager/arvnodeman/computenode/__init__.py
new file mode 100644
index 0000000..ae25428
--- /dev/null
+++ b/services/nodemanager/arvnodeman/computenode/__init__.py
@@ -0,0 +1,383 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+import functools
+import itertools
+import logging
+import time
+
+import pykka
+
+from ..clientactor import _notify_subscribers
+from .. import config
+
+def arvados_node_fqdn(arvados_node, default_hostname='dynamic.compute'):
+ hostname = arvados_node.get('hostname') or default_hostname
+ return '{}.{}'.format(hostname, arvados_node['domain'])
+
+def arvados_node_mtime(node):
+ return time.mktime(time.strptime(node['modified_at'] + 'UTC',
+ '%Y-%m-%dT%H:%M:%SZ%Z')) - time.timezone
+
+def timestamp_fresh(timestamp, fresh_time):
+ return (time.time() - timestamp) < fresh_time
+
+def _retry(errors):
+ """Retry decorator for an actor method that makes remote requests.
+
+ Use this function to decorator an actor method, and pass in a tuple of
+ exceptions to catch. This decorator will schedule retries of that method
+ with exponential backoff if the original method raises any of the given
+ errors.
+ """
+ def decorator(orig_func):
+ @functools.wraps(orig_func)
+ def wrapper(self, *args, **kwargs):
+ try:
+ orig_func(self, *args, **kwargs)
+ except errors as error:
+ self._logger.warning(
+ "Client error: %s - waiting %s seconds",
+ error, self.retry_wait)
+ self._timer.schedule(self.retry_wait,
+ getattr(self._later, orig_func.__name__),
+ *args, **kwargs)
+ self.retry_wait = min(self.retry_wait * 2,
+ self.max_retry_wait)
+ else:
+ self.retry_wait = self.min_retry_wait
+ return wrapper
+ return decorator
+
+class BaseComputeNodeDriver(object):
+ """Abstract base class for compute node drivers.
+
+ libcloud abstracts away many of the differences between cloud providers,
+ but managing compute nodes requires some cloud-specific features (e.g.,
+ on EC2 we use tags to identify compute nodes). Compute node drivers
+ are responsible for translating the node manager's cloud requests to a
+ specific cloud's vocabulary.
+
+ Subclasses must implement arvados_create_kwargs (to update node
+ creation kwargs with information about the specific Arvados node
+ record), sync_node, and node_start_time.
+ """
+ def __init__(self, auth_kwargs, list_kwargs, create_kwargs, driver_class):
+ self.real = driver_class(**auth_kwargs)
+ self.list_kwargs = list_kwargs
+ self.create_kwargs = create_kwargs
+
+ def __getattr__(self, name):
+ # Proxy non-extension methods to the real driver.
+ if (not name.startswith('_') and not name.startswith('ex_')
+ and hasattr(self.real, name)):
+ return getattr(self.real, name)
+ else:
+ return super(BaseComputeNodeDriver, self).__getattr__(name)
+
+ def search_for(self, term, list_method, key=lambda item: item.id):
+ cache_key = (list_method, term)
+ if cache_key not in self.SEARCH_CACHE:
+ results = [item for item in getattr(self.real, list_method)()
+ if key(item) == term]
+ count = len(results)
+ if count != 1:
+ raise ValueError("{} returned {} results for '{}'".format(
+ list_method, count, term))
+ self.SEARCH_CACHE[cache_key] = results[0]
+ return self.SEARCH_CACHE[cache_key]
+
+ def list_nodes(self):
+ return self.real.list_nodes(**self.list_kwargs)
+
+ def arvados_create_kwargs(self, arvados_node):
+ raise NotImplementedError("BaseComputeNodeDriver.arvados_create_kwargs")
+
+ def create_node(self, size, arvados_node):
+ kwargs = self.create_kwargs.copy()
+ kwargs.update(self.arvados_create_kwargs(arvados_node))
+ kwargs['size'] = size
+ return self.real.create_node(**kwargs)
+
+ def sync_node(self, cloud_node, arvados_node):
+ # When a compute node first pings the API server, the API server
+ # will automatically assign some attributes on the corresponding
+ # node record, like hostname. This method should propagate that
+ # information back to the cloud node appropriately.
+ raise NotImplementedError("BaseComputeNodeDriver.sync_node")
+
+ @classmethod
+ def node_start_time(cls, node):
+ raise NotImplementedError("BaseComputeNodeDriver.node_start_time")
+
+
+ComputeNodeDriverClass = BaseComputeNodeDriver
+
+class ComputeNodeSetupActor(config.actor_class):
+ """Actor to create and set up a cloud compute node.
+
+ This actor prepares an Arvados node record for a new compute node
+ (either creating one or cleaning one passed in), then boots the
+ actual compute node. It notifies subscribers when the cloud node
+ is successfully created (the last step in the process for Node
+ Manager to handle).
+ """
+ def __init__(self, timer_actor, arvados_client, cloud_client,
+ cloud_size, arvados_node=None,
+ retry_wait=1, max_retry_wait=180):
+ super(ComputeNodeSetupActor, self).__init__()
+ self._timer = timer_actor
+ self._arvados = arvados_client
+ self._cloud = cloud_client
+ self._later = self.actor_ref.proxy()
+ self._logger = logging.getLogger('arvnodeman.nodeup')
+ self.cloud_size = cloud_size
+ self.subscribers = set()
+ self.min_retry_wait = retry_wait
+ self.max_retry_wait = max_retry_wait
+ self.retry_wait = retry_wait
+ self.arvados_node = None
+ self.cloud_node = None
+ if arvados_node is None:
+ self._later.create_arvados_node()
+ else:
+ self._later.prepare_arvados_node(arvados_node)
+
+ @_retry(config.ARVADOS_ERRORS)
+ def create_arvados_node(self):
+ self.arvados_node = self._arvados.nodes().create(body={}).execute()
+ self._later.create_cloud_node()
+
+ @_retry(config.ARVADOS_ERRORS)
+ def prepare_arvados_node(self, node):
+ self.arvados_node = self._arvados.nodes().update(
+ uuid=node['uuid'],
+ body={'hostname': None,
+ 'ip_address': None,
+ 'slot_number': None,
+ 'first_ping_at': None,
+ 'last_ping_at': None,
+ 'info': {'ec2_instance_id': None,
+ 'last_action': "Prepared by Node Manager"}}
+ ).execute()
+ self._later.create_cloud_node()
+
+ @_retry(config.CLOUD_ERRORS)
+ def create_cloud_node(self):
+ self._logger.info("Creating cloud node with size %s.",
+ self.cloud_size.name)
+ self.cloud_node = self._cloud.create_node(self.cloud_size,
+ self.arvados_node)
+ self._logger.info("Cloud node %s created.", self.cloud_node.id)
+ _notify_subscribers(self._later, self.subscribers)
+ self.subscribers = None
+
+ def stop_if_no_cloud_node(self):
+ if self.cloud_node is None:
+ self.stop()
+
+ def subscribe(self, subscriber):
+ if self.subscribers is None:
+ try:
+ subscriber(self._later)
+ except pykka.ActorDeadError:
+ pass
+ else:
+ self.subscribers.add(subscriber)
+
+
+class ComputeNodeShutdownActor(config.actor_class):
+ """Actor to shut down a compute node.
+
+ This actor simply destroys a cloud node, retrying as needed.
+ """
+ def __init__(self, timer_actor, cloud_client, cloud_node,
+ retry_wait=1, max_retry_wait=180):
+ super(ComputeNodeShutdownActor, self).__init__()
+ self._timer = timer_actor
+ self._cloud = cloud_client
+ self._later = self.actor_ref.proxy()
+ self._logger = logging.getLogger('arvnodeman.nodedown')
+ self.cloud_node = cloud_node
+ self.min_retry_wait = retry_wait
+ self.max_retry_wait = max_retry_wait
+ self.retry_wait = retry_wait
+ self._later.shutdown_node()
+
+ @_retry(config.CLOUD_ERRORS)
+ def shutdown_node(self):
+ self._cloud.destroy_node(self.cloud_node)
+ self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
+
+
+class ComputeNodeUpdateActor(config.actor_class):
+ """Actor to dispatch one-off cloud management requests.
+
+ This actor receives requests for small cloud updates, and
+ dispatches them to a real driver. ComputeNodeMonitorActors use
+ this to perform maintenance tasks on themselves. Having a
+ dedicated actor for this gives us the opportunity to control the
+ flow of requests; e.g., by backing off when errors occur.
+
+ This actor is most like a "traditional" Pykka actor: there's no
+ subscribing, but instead methods return real driver results. If
+ you're interested in those results, you should get them from the
+ Future that the proxy method returns. Be prepared to handle exceptions
+ from the cloud driver when you do.
+ """
+ def __init__(self, cloud_factory, max_retry_wait=180):
+ super(ComputeNodeUpdateActor, self).__init__()
+ self._cloud = cloud_factory()
+ self.max_retry_wait = max_retry_wait
+ self.error_streak = 0
+ self.next_request_time = time.time()
+
+ def _throttle_errors(orig_func):
+ @functools.wraps(orig_func)
+ def wrapper(self, *args, **kwargs):
+ throttle_time = self.next_request_time - time.time()
+ if throttle_time > 0:
+ time.sleep(throttle_time)
+ self.next_request_time = time.time()
+ try:
+ result = orig_func(self, *args, **kwargs)
+ except config.CLOUD_ERRORS:
+ self.error_streak += 1
+ self.next_request_time += min(2 ** self.error_streak,
+ self.max_retry_wait)
+ raise
+ else:
+ self.error_streak = 0
+ return result
+ return wrapper
+
+ @_throttle_errors
+ def sync_node(self, cloud_node, arvados_node):
+ return self._cloud.sync_node(cloud_node, arvados_node)
+
+
+class ShutdownTimer(object):
+ """Keep track of a cloud node's shutdown windows.
+
+ Instantiate this class with a timestamp of when a cloud node started,
+ and a list of durations (in minutes) of when the node must not and may
+ be shut down, alternating. The class will tell you when a shutdown
+ window is open, and when the next open window will start.
+ """
+ def __init__(self, start_time, shutdown_windows):
+ # The implementation is easiest if we have an even number of windows,
+ # because then windows always alternate between open and closed.
+ # Rig that up: calculate the first shutdown window based on what's
+ # passed in. Then, if we were given an odd number of windows, merge
+ # that first window into the last one, since they both# represent
+ # closed state.
+ first_window = shutdown_windows[0]
+ shutdown_windows = list(shutdown_windows[1:])
+ self._next_opening = start_time + (60 * first_window)
+ if len(shutdown_windows) % 2:
+ shutdown_windows.append(first_window)
+ else:
+ shutdown_windows[-1] += first_window
+ self.shutdown_windows = itertools.cycle([60 * n
+ for n in shutdown_windows])
+ self._open_start = self._next_opening
+ self._open_for = next(self.shutdown_windows)
+
+ def _advance_opening(self):
+ while self._next_opening < time.time():
+ self._open_start = self._next_opening
+ self._next_opening += self._open_for + next(self.shutdown_windows)
+ self._open_for = next(self.shutdown_windows)
+
+ def next_opening(self):
+ self._advance_opening()
+ return self._next_opening
+
+ def window_open(self):
+ self._advance_opening()
+ return 0 < (time.time() - self._open_start) < self._open_for
+
+
+class ComputeNodeMonitorActor(config.actor_class):
+ """Actor to manage a running compute node.
+
+ This actor gets updates about a compute node's cloud and Arvados records.
+ It uses this information to notify subscribers when the node is eligible
+ for shutdown.
+ """
+ def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
+ timer_actor, update_actor, arvados_node=None,
+ poll_stale_after=600, node_stale_after=3600):
+ super(ComputeNodeMonitorActor, self).__init__()
+ self._later = self.actor_ref.proxy()
+ self._logger = logging.getLogger('arvnodeman.computenode')
+ self._last_log = None
+ self._shutdowns = shutdown_timer
+ self._timer = timer_actor
+ self._update = update_actor
+ self.cloud_node = cloud_node
+ self.cloud_node_start_time = cloud_node_start_time
+ self.poll_stale_after = poll_stale_after
+ self.node_stale_after = node_stale_after
+ self.subscribers = set()
+ self.arvados_node = None
+ self._later.update_arvados_node(arvados_node)
+ self.last_shutdown_opening = None
+ self._later.consider_shutdown()
+
+ def subscribe(self, subscriber):
+ self.subscribers.add(subscriber)
+
+ def _debug(self, msg, *args):
+ if msg == self._last_log:
+ return
+ self._last_log = msg
+ self._logger.debug(msg, *args)
+
+ def _shutdown_eligible(self):
+ if self.arvados_node is None:
+ return timestamp_fresh(self.cloud_node_start_time,
+ self.node_stale_after)
+ else:
+ return (timestamp_fresh(arvados_node_mtime(self.arvados_node),
+ self.poll_stale_after) and
+ (self.arvados_node['info'].get('slurm_state') == 'idle'))
+
+ def consider_shutdown(self):
+ next_opening = self._shutdowns.next_opening()
+ if self._shutdowns.window_open():
+ if self._shutdown_eligible():
+ self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
+ _notify_subscribers(self._later, self.subscribers)
+ else:
+ self._debug("Node %s shutdown window open but node busy.",
+ self.cloud_node.id)
+ else:
+ self._debug("Node %s shutdown window closed. Next at %s.",
+ self.cloud_node.id, time.ctime(next_opening))
+ if self.last_shutdown_opening != next_opening:
+ self._timer.schedule(next_opening, self._later.consider_shutdown)
+ self.last_shutdown_opening = next_opening
+
+ def offer_arvados_pair(self, arvados_node):
+ if self.arvados_node is not None:
+ return None
+ elif arvados_node['ip_address'] in self.cloud_node.private_ips:
+ self._later.update_arvados_node(arvados_node)
+ return self.cloud_node.id
+ else:
+ return None
+
+ def update_cloud_node(self, cloud_node):
+ if cloud_node is not None:
+ self.cloud_node = cloud_node
+ self._later.consider_shutdown()
+
+ def update_arvados_node(self, arvados_node):
+ if arvados_node is not None:
+ self.arvados_node = arvados_node
+ new_hostname = arvados_node_fqdn(self.arvados_node)
+ if new_hostname != self.cloud_node.name:
+ self._update.sync_node(self.cloud_node, self.arvados_node)
+ self._later.consider_shutdown()
diff --git a/services/nodemanager/arvnodeman/computenode/dummy.py b/services/nodemanager/arvnodeman/computenode/dummy.py
new file mode 100644
index 0000000..6c39fea
--- /dev/null
+++ b/services/nodemanager/arvnodeman/computenode/dummy.py
@@ -0,0 +1,52 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+import time
+
+import libcloud.compute.providers as cloud_provider
+import libcloud.compute.types as cloud_types
+
+from . import BaseComputeNodeDriver, arvados_node_fqdn
+
+class ComputeNodeDriver(BaseComputeNodeDriver):
+ """Compute node driver wrapper for libcloud's dummy driver.
+
+ This class provides the glue necessary to run the node manager with a
+ dummy cloud. It's useful for testing.
+ """
+ DEFAULT_DRIVER = cloud_provider.get_driver(cloud_types.Provider.DUMMY)
+ DEFAULT_REAL = DEFAULT_DRIVER('ComputeNodeDriver')
+ DUMMY_START_TIME = time.time()
+
+ def __init__(self, auth_kwargs, list_kwargs, create_kwargs,
+ driver_class=DEFAULT_DRIVER):
+ super(ComputeNodeDriver, self).__init__(
+ auth_kwargs, list_kwargs, create_kwargs, driver_class)
+ if driver_class is self.DEFAULT_DRIVER:
+ self.real = self.DEFAULT_REAL
+
+ def _ensure_private_ip(self, node):
+ if not node.private_ips:
+ node.private_ips = ['10.10.0.{}'.format(node.id)]
+
+ def arvados_create_kwargs(self, arvados_node):
+ return {}
+
+ def list_nodes(self):
+ nodelist = super(ComputeNodeDriver, self).list_nodes()
+ for node in nodelist:
+ self._ensure_private_ip(node)
+ return nodelist
+
+ def create_node(self, size, arvados_node):
+ node = super(ComputeNodeDriver, self).create_node(size, arvados_node)
+ self._ensure_private_ip(node)
+ return node
+
+ def sync_node(self, cloud_node, arvados_node):
+ cloud_node.name = arvados_node_fqdn(arvados_node)
+
+ @classmethod
+ def node_start_time(cls, node):
+ return cls.DUMMY_START_TIME
diff --git a/services/nodemanager/arvnodeman/computenode/ec2.py b/services/nodemanager/arvnodeman/computenode/ec2.py
new file mode 100644
index 0000000..359bed4
--- /dev/null
+++ b/services/nodemanager/arvnodeman/computenode/ec2.py
@@ -0,0 +1,101 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+import time
+
+import libcloud.compute.base as cloud_base
+import libcloud.compute.providers as cloud_provider
+import libcloud.compute.types as cloud_types
+from libcloud.compute.drivers import ec2 as cloud_ec2
+
+from . import BaseComputeNodeDriver, arvados_node_fqdn
+
+### Monkeypatch libcloud to support AWS' new SecurityGroup API.
+# These classes can be removed when libcloud support specifying
+# security groups with the SecurityGroupId parameter.
+class ANMEC2Connection(cloud_ec2.EC2Connection):
+ def request(self, *args, **kwargs):
+ params = kwargs.get('params')
+ if (params is not None) and (params.get('Action') == 'RunInstances'):
+ for key in params.keys():
+ if key.startswith('SecurityGroup.'):
+ new_key = key.replace('Group.', 'GroupId.', 1)
+ params[new_key] = params.pop(key).id
+ kwargs['params'] = params
+ return super(ANMEC2Connection, self).request(*args, **kwargs)
+
+
+class ANMEC2NodeDriver(cloud_ec2.EC2NodeDriver):
+ connectionCls = ANMEC2Connection
+
+
+class ComputeNodeDriver(BaseComputeNodeDriver):
+ """Compute node driver wrapper for EC2.
+
+ This translates cloud driver requests to EC2's specific parameters.
+ """
+ DEFAULT_DRIVER = ANMEC2NodeDriver
+### End monkeypatch
+ SEARCH_CACHE = {}
+
+ def __init__(self, auth_kwargs, list_kwargs, create_kwargs,
+ driver_class=DEFAULT_DRIVER):
+ # We need full lists of keys up front because these loops modify
+ # dictionaries in-place.
+ for key in list_kwargs.keys():
+ list_kwargs[key.replace('_', ':')] = list_kwargs.pop(key)
+ self.tags = {key[4:]: value
+ for key, value in list_kwargs.iteritems()
+ if key.startswith('tag:')}
+ super(ComputeNodeDriver, self).__init__(
+ auth_kwargs, {'ex_filters': list_kwargs}, create_kwargs,
+ driver_class)
+ for key in self.create_kwargs.keys():
+ init_method = getattr(self, '_init_' + key, None)
+ if init_method is not None:
+ new_pair = init_method(self.create_kwargs.pop(key))
+ if new_pair is not None:
+ self.create_kwargs[new_pair[0]] = new_pair[1]
+
+ def _init_image_id(self, image_id):
+ return 'image', self.search_for(image_id, 'list_images')
+
+ def _init_ping_host(self, ping_host):
+ self.ping_host = ping_host
+
+ def _init_security_groups(self, group_names):
+ return 'ex_security_groups', [
+ self.search_for(gname.strip(), 'ex_get_security_groups')
+ for gname in group_names.split(',')]
+
+ def _init_subnet_id(self, subnet_id):
+ return 'ex_subnet', self.search_for(subnet_id, 'ex_list_subnets')
+
+ def _init_ssh_key(self, filename):
+ with open(filename) as ssh_file:
+ key = cloud_base.NodeAuthSSHKey(ssh_file.read())
+ return 'auth', key
+
+ def arvados_create_kwargs(self, arvados_node):
+ result = {'ex_metadata': self.tags.copy(),
+ 'name': arvados_node_fqdn(arvados_node)}
+ ping_secret = arvados_node['info'].get('ping_secret')
+ if ping_secret is not None:
+ ping_url = ('https://{}/arvados/v1/nodes/{}/ping?ping_secret={}'.
+ format(self.ping_host, arvados_node['uuid'],
+ ping_secret))
+ result['ex_userdata'] = ping_url
+ return result
+
+ def sync_node(self, cloud_node, arvados_node):
+ metadata = self.arvados_create_kwargs(arvados_node)
+ tags = metadata['ex_metadata']
+ tags['Name'] = metadata['name']
+ self.real.ex_create_tags(cloud_node, tags)
+
+ @classmethod
+ def node_start_time(cls, node):
+ time_str = node.extra['launch_time'].split('.', 2)[0] + 'UTC'
+ return time.mktime(time.strptime(
+ time_str,'%Y-%m-%dT%H:%M:%S%Z')) - time.timezone
diff --git a/services/nodemanager/arvnodeman/config.py b/services/nodemanager/arvnodeman/config.py
new file mode 100644
index 0000000..07504e2
--- /dev/null
+++ b/services/nodemanager/arvnodeman/config.py
@@ -0,0 +1,108 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+import ConfigParser
+import importlib
+import logging
+import ssl
+
+import apiclient.errors as apierror
+import arvados
+import httplib2
+import libcloud.common.types as cloud_types
+import pykka
+
+# IOError is the base class for socket.error and friends.
+# It seems like it hits the sweet spot for operations we want to retry:
+# it's low-level, but unlikely to catch code bugs.
+NETWORK_ERRORS = (IOError, ssl.SSLError)
+ARVADOS_ERRORS = NETWORK_ERRORS + (apierror.Error,)
+CLOUD_ERRORS = NETWORK_ERRORS + (cloud_types.LibcloudError,)
+
+actor_class = pykka.ThreadingActor
+
+class NodeManagerConfig(ConfigParser.SafeConfigParser):
+ """Node Manager Configuration class.
+
+ This a standard Python ConfigParser, with additional helper methods to
+ create objects instantiated with configuration information.
+ """
+
+ LOGGING_NONLEVELS = frozenset(['file'])
+
+ def __init__(self, *args, **kwargs):
+ # Can't use super() because SafeConfigParser is an old-style class.
+ ConfigParser.SafeConfigParser.__init__(self, *args, **kwargs)
+ for sec_name, settings in {
+ 'Arvados': {'insecure': 'no',
+ 'timeout': '15'},
+ 'Daemon': {'max_nodes': '1',
+ 'poll_time': '60',
+ 'max_poll_time': '300',
+ 'poll_stale_after': '600',
+ 'node_stale_after': str(60 * 60 * 2)},
+ 'Logging': {'file': '/dev/stderr',
+ 'level': 'WARNING'},
+ }.iteritems():
+ if not self.has_section(sec_name):
+ self.add_section(sec_name)
+ for opt_name, value in settings.iteritems():
+ if not self.has_option(sec_name, opt_name):
+ self.set(sec_name, opt_name, value)
+
+ def get_section(self, section, transformer=None):
+ result = self._dict()
+ for key, value in self.items(section):
+ if transformer is not None:
+ try:
+ value = transformer(value)
+ except (TypeError, ValueError):
+ pass
+ result[key] = value
+ return result
+
+ def log_levels(self):
+ return {key: getattr(logging, self.get('Logging', key).upper())
+ for key in self.options('Logging')
+ if key not in self.LOGGING_NONLEVELS}
+
+ def new_arvados_client(self):
+ if self.has_option('Daemon', 'certs_file'):
+ certs_file = self.get('Daemon', 'certs_file')
+ else:
+ certs_file = None
+ insecure = self.getboolean('Arvados', 'insecure')
+ http = httplib2.Http(timeout=self.getint('Arvados', 'timeout'),
+ ca_certs=certs_file,
+ disable_ssl_certificate_validation=insecure)
+ return arvados.api('v1',
+ cache=False, # Don't reuse an existing client.
+ host=self.get('Arvados', 'host'),
+ token=self.get('Arvados', 'token'),
+ insecure=insecure,
+ http=http)
+
+ def new_cloud_client(self):
+ module = importlib.import_module('arvnodeman.computenode.' +
+ self.get('Cloud', 'provider'))
+ auth_kwargs = self.get_section('Cloud Credentials')
+ if 'timeout' in auth_kwargs:
+ auth_kwargs['timeout'] = int(auth_kwargs['timeout'])
+ return module.ComputeNodeDriver(auth_kwargs,
+ self.get_section('Cloud List'),
+ self.get_section('Cloud Create'))
+
+ def node_sizes(self, all_sizes):
+ size_kwargs = {}
+ for sec_name in self.sections():
+ sec_words = sec_name.split(None, 2)
+ if sec_words[0] != 'Size':
+ continue
+ size_kwargs[sec_words[1]] = self.get_section(sec_name, int)
+ return [(size, size_kwargs[size.id]) for size in all_sizes
+ if size.id in size_kwargs]
+
+ def shutdown_windows(self):
+ return [int(n)
+ for n in self.get('Cloud', 'shutdown_windows').split(',')]
diff --git a/services/nodemanager/arvnodeman/daemon.py b/services/nodemanager/arvnodeman/daemon.py
new file mode 100644
index 0000000..5b7437f
--- /dev/null
+++ b/services/nodemanager/arvnodeman/daemon.py
@@ -0,0 +1,294 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+import functools
+import logging
+import time
+
+import pykka
+
+from . import computenode as cnode
+from .config import actor_class
+
+class _ComputeNodeRecord(object):
+ def __init__(self, actor=None, cloud_node=None, arvados_node=None,
+ assignment_time=float('-inf')):
+ self.actor = actor
+ self.cloud_node = cloud_node
+ self.arvados_node = arvados_node
+ self.assignment_time = assignment_time
+
+
+class _BaseNodeTracker(object):
+ def __init__(self):
+ self.nodes = {}
+ self.orphans = {}
+
+ def __getitem__(self, key):
+ return self.nodes[key]
+
+ def __len__(self):
+ return len(self.nodes)
+
+ def get(self, key, default=None):
+ return self.nodes.get(key, default)
+
+ def record_key(self, record):
+ return self.item_key(getattr(record, self.RECORD_ATTR))
+
+ def add(self, record):
+ self.nodes[self.record_key(record)] = record
+
+ def update_record(self, key, item):
+ setattr(self.nodes[key], self.RECORD_ATTR, item)
+
+ def update_from(self, response):
+ unseen = set(self.nodes.iterkeys())
+ for item in response:
+ key = self.item_key(item)
+ if key in unseen:
+ unseen.remove(key)
+ self.update_record(key, item)
+ else:
+ yield key, item
+ self.orphans = {key: self.nodes.pop(key) for key in unseen}
+
+ def unpaired(self):
+ return (record for record in self.nodes.itervalues()
+ if getattr(record, self.PAIR_ATTR) is None)
+
+
+class _CloudNodeTracker(_BaseNodeTracker):
+ RECORD_ATTR = 'cloud_node'
+ PAIR_ATTR = 'arvados_node'
+ item_key = staticmethod(lambda cloud_node: cloud_node.id)
+
+
+class _ArvadosNodeTracker(_BaseNodeTracker):
+ RECORD_ATTR = 'arvados_node'
+ PAIR_ATTR = 'cloud_node'
+ item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
+
+ def find_stale_node(self, stale_time):
+ for record in self.nodes.itervalues():
+ node = record.arvados_node
+ if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
+ stale_time) and
+ not cnode.timestamp_fresh(record.assignment_time,
+ stale_time)):
+ return node
+ return None
+
+
+class NodeManagerDaemonActor(actor_class):
+ """Node Manager daemon.
+
+ This actor subscribes to all information polls about cloud nodes,
+ Arvados nodes, and the job queue. It creates a ComputeNodeMonitorActor
+ for every cloud node, subscribing them to poll updates
+ appropriately. It creates and destroys cloud nodes based on job queue
+ demand, and stops the corresponding ComputeNode actors when their work
+ is done.
+ """
+ def __init__(self, server_wishlist_actor, arvados_nodes_actor,
+ cloud_nodes_actor, cloud_update_actor, timer_actor,
+ arvados_factory, cloud_factory,
+ shutdown_windows, max_nodes,
+ poll_stale_after=600, node_stale_after=7200,
+ node_setup_class=cnode.ComputeNodeSetupActor,
+ node_shutdown_class=cnode.ComputeNodeShutdownActor,
+ node_actor_class=cnode.ComputeNodeMonitorActor):
+ super(NodeManagerDaemonActor, self).__init__()
+ self._node_setup = node_setup_class
+ self._node_shutdown = node_shutdown_class
+ self._node_actor = node_actor_class
+ self._cloud_updater = cloud_update_actor
+ self._timer = timer_actor
+ self._new_arvados = arvados_factory
+ self._new_cloud = cloud_factory
+ self._cloud_driver = self._new_cloud()
+ self._logger = logging.getLogger('arvnodeman.daemon')
+ self._later = self.actor_ref.proxy()
+ self.shutdown_windows = shutdown_windows
+ self.max_nodes = max_nodes
+ self.poll_stale_after = poll_stale_after
+ self.node_stale_after = node_stale_after
+ self.last_polls = {}
+ for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
+ poll_actor = locals()[poll_name + '_actor']
+ poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
+ setattr(self, '_{}_actor'.format(poll_name), poll_actor)
+ self.last_polls[poll_name] = -self.poll_stale_after
+ self.cloud_nodes = _CloudNodeTracker()
+ self.arvados_nodes = _ArvadosNodeTracker()
+ self.booting = {} # Actor IDs to ComputeNodeSetupActors
+ self.shutdowns = {} # Cloud node IDs to ComputeNodeShutdownActors
+ self._logger.debug("Daemon initialized")
+
+ def _update_poll_time(self, poll_key):
+ self.last_polls[poll_key] = time.time()
+
+ def _pair_nodes(self, node_record, arvados_node):
+ self._logger.info("Cloud node %s has associated with Arvados node %s",
+ node_record.cloud_node.id, arvados_node['uuid'])
+ self._arvados_nodes_actor.subscribe_to(
+ arvados_node['uuid'], node_record.actor.update_arvados_node)
+ node_record.arvados_node = arvados_node
+ self.arvados_nodes.add(node_record)
+
+ def _new_node(self, cloud_node):
+ start_time = self._cloud_driver.node_start_time(cloud_node)
+ shutdown_timer = cnode.ShutdownTimer(start_time,
+ self.shutdown_windows)
+ actor = self._node_actor.start(
+ cloud_node=cloud_node,
+ cloud_node_start_time=start_time,
+ shutdown_timer=shutdown_timer,
+ update_actor=self._cloud_updater,
+ timer_actor=self._timer,
+ arvados_node=None,
+ poll_stale_after=self.poll_stale_after,
+ node_stale_after=self.node_stale_after).proxy()
+ actor.subscribe(self._later.node_can_shutdown)
+ self._cloud_nodes_actor.subscribe_to(cloud_node.id,
+ actor.update_cloud_node)
+ record = _ComputeNodeRecord(actor, cloud_node)
+ self.cloud_nodes.add(record)
+ return record
+
+ def update_cloud_nodes(self, nodelist):
+ self._update_poll_time('cloud_nodes')
+ for key, node in self.cloud_nodes.update_from(nodelist):
+ self._logger.info("Registering new cloud node %s", key)
+ record = self._new_node(node)
+ for arv_rec in self.arvados_nodes.unpaired():
+ if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
+ self._pair_nodes(record, arv_rec.arvados_node)
+ break
+ for key, record in self.cloud_nodes.orphans.iteritems():
+ record.actor.stop()
+ if key in self.shutdowns:
+ self.shutdowns.pop(key).stop()
+
+ def update_arvados_nodes(self, nodelist):
+ self._update_poll_time('arvados_nodes')
+ for key, node in self.arvados_nodes.update_from(nodelist):
+ self._logger.info("Registering new Arvados node %s", key)
+ record = _ComputeNodeRecord(arvados_node=node)
+ self.arvados_nodes.add(record)
+ for arv_rec in self.arvados_nodes.unpaired():
+ arv_node = arv_rec.arvados_node
+ for cloud_rec in self.cloud_nodes.unpaired():
+ if cloud_rec.actor.offer_arvados_pair(arv_node).get():
+ self._pair_nodes(cloud_rec, arv_node)
+ break
+
+ def _node_count(self):
+ up = sum(len(nodelist) for nodelist in [self.cloud_nodes, self.booting])
+ return up - len(self.shutdowns)
+
+ def _nodes_wanted(self):
+ return len(self.last_wishlist) - self._node_count()
+
+ def _nodes_excess(self):
+ return -self._nodes_wanted()
+
+ def update_server_wishlist(self, wishlist):
+ self._update_poll_time('server_wishlist')
+ self.last_wishlist = wishlist[:self.max_nodes]
+ nodes_wanted = self._nodes_wanted()
+ if nodes_wanted > 0:
+ self._later.start_node()
+ elif (nodes_wanted < 0) and self.booting:
+ self._later.stop_booting_node()
+
+ def _check_poll_freshness(orig_func):
+ """Decorator to inhibit a method when poll information is stale.
+
+ This decorator checks the timestamps of all the poll information the
+ daemon has received. The decorated method is only called if none
+ of the timestamps are considered stale.
+ """
+ @functools.wraps(orig_func)
+ def wrapper(self, *args, **kwargs):
+ now = time.time()
+ if all(now - t < self.poll_stale_after
+ for t in self.last_polls.itervalues()):
+ return orig_func(self, *args, **kwargs)
+ else:
+ return None
+ return wrapper
+
+ @_check_poll_freshness
+ def start_node(self):
+ nodes_wanted = self._nodes_wanted()
+ if nodes_wanted < 1:
+ return None
+ arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
+ cloud_size = self.last_wishlist[nodes_wanted - 1]
+ self._logger.info("Want %s more nodes. Booting a %s node.",
+ nodes_wanted, cloud_size.name)
+ new_setup = self._node_setup.start(
+ timer_actor=self._timer,
+ arvados_client=self._new_arvados(),
+ arvados_node=arvados_node,
+ cloud_client=self._new_cloud(),
+ cloud_size=cloud_size).proxy()
+ self.booting[new_setup.actor_ref.actor_urn] = new_setup
+ if arvados_node is not None:
+ self.arvados_nodes[arvados_node['uuid']].assignment_time = (
+ time.time())
+ new_setup.subscribe(self._later.node_up)
+ if nodes_wanted > 1:
+ self._later.start_node()
+
+ def _actor_nodes(self, node_actor):
+ return pykka.get_all([node_actor.cloud_node, node_actor.arvados_node])
+
+ def node_up(self, setup_proxy):
+ cloud_node, arvados_node = self._actor_nodes(setup_proxy)
+ del self.booting[setup_proxy.actor_ref.actor_urn]
+ setup_proxy.stop()
+ record = self.cloud_nodes.get(cloud_node.id)
+ if record is None:
+ record = self._new_node(cloud_node)
+ self._pair_nodes(record, arvados_node)
+
+ @_check_poll_freshness
+ def stop_booting_node(self):
+ nodes_excess = self._nodes_excess()
+ if (nodes_excess < 1) or not self.booting:
+ return None
+ for key, node in self.booting.iteritems():
+ node.stop_if_no_cloud_node().get()
+ if not node.actor_ref.is_alive():
+ del self.booting[key]
+ if nodes_excess > 1:
+ self._later.stop_booting_node()
+ break
+
+ @_check_poll_freshness
+ def node_can_shutdown(self, node_actor):
+ if self._nodes_excess() < 1:
+ return None
+ cloud_node, arvados_node = self._actor_nodes(node_actor)
+ if cloud_node.id in self.shutdowns:
+ return None
+ shutdown = self._node_shutdown.start(timer_actor=self._timer,
+ cloud_client=self._new_cloud(),
+ cloud_node=cloud_node).proxy()
+ self.shutdowns[cloud_node.id] = shutdown
+
+ def shutdown(self):
+ self._logger.info("Shutting down after signal.")
+ self.poll_stale_after = -1 # Inhibit starting/stopping nodes
+ for bootnode in self.booting.itervalues():
+ bootnode.stop_if_no_cloud_node()
+ self._later.await_shutdown()
+
+ def await_shutdown(self):
+ if any(node.actor_ref.is_alive() for node in self.booting.itervalues()):
+ self._timer.schedule(time.time() + 1, self._later.await_shutdown)
+ else:
+ self.stop()
diff --git a/services/nodemanager/arvnodeman/jobqueue.py b/services/nodemanager/arvnodeman/jobqueue.py
new file mode 100644
index 0000000..08ee12e
--- /dev/null
+++ b/services/nodemanager/arvnodeman/jobqueue.py
@@ -0,0 +1,96 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+from . import clientactor
+from .config import ARVADOS_ERRORS
+
+class ServerCalculator(object):
+ """Generate cloud server wishlists from an Arvados job queue.
+
+ Instantiate this class with a list of cloud node sizes you're willing to
+ use, plus keyword overrides from the configuration. Then you can pass
+ job queues to servers_for_queue. It will return a list of node sizes
+ that would best satisfy the jobs, choosing the cheapest size that
+ satisfies each job, and ignoring jobs that can't be satisfied.
+ """
+
+ class CloudSizeWrapper(object):
+ def __init__(self, real_size, **kwargs):
+ self.real = real_size
+ for name in ['id', 'name', 'ram', 'disk', 'bandwidth', 'price',
+ 'extra']:
+ setattr(self, name, getattr(self.real, name))
+ self.cores = kwargs.pop('cores')
+ self.scratch = self.disk
+ for name, override in kwargs.iteritems():
+ if not hasattr(self, name):
+ raise ValueError("unrecognized size field '%s'" % (name,))
+ setattr(self, name, override)
+
+ def meets_constraints(self, **kwargs):
+ for name, want_value in kwargs.iteritems():
+ have_value = getattr(self, name)
+ if (have_value != 0) and (have_value < want_value):
+ return False
+ return True
+
+
+ def __init__(self, server_list, max_nodes=None):
+ self.cloud_sizes = [self.CloudSizeWrapper(s, **kws)
+ for s, kws in server_list]
+ self.cloud_sizes.sort(key=lambda s: s.price)
+ self.max_nodes = max_nodes or float("inf")
+
+ @staticmethod
+ def coerce_int(x, fallback):
+ try:
+ return int(x)
+ except (TypeError, ValueError):
+ return fallback
+
+ def cloud_size_for_constraints(self, constraints):
+ want_value = lambda key: self.coerce_int(constraints.get(key), 0)
+ wants = {'cores': want_value('min_cores_per_node'),
+ 'ram': want_value('min_ram_mb_per_node'),
+ 'scratch': want_value('min_scratch_mb_per_node')}
+ for size in self.cloud_sizes:
+ if size.meets_constraints(**wants):
+ return size
+ return None
+
+ def servers_for_queue(self, queue):
+ servers = []
+ for job in queue:
+ constraints = job['runtime_constraints']
+ want_count = self.coerce_int(constraints.get('min_nodes'), 1)
+ cloud_size = self.cloud_size_for_constraints(constraints)
+ if (want_count < self.max_nodes) and (cloud_size is not None):
+ servers.extend([cloud_size.real] * max(1, want_count))
+ return servers
+
+
+class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
+ """Actor to generate server wishlists from the job queue.
+
+ This actor regularly polls Arvados' job queue, and uses the provided
+ ServerCalculator to turn that into a list of requested node sizes. That
+ list is sent to subscribers on every poll.
+ """
+
+ CLIENT_ERRORS = ARVADOS_ERRORS
+ LOGGER_NAME = 'arvnodeman.jobqueue'
+
+ def __init__(self, client, timer_actor, server_calc, *args, **kwargs):
+ super(JobQueueMonitorActor, self).__init__(
+ client, timer_actor, *args, **kwargs)
+ self._calculator = server_calc
+
+ def _send_request(self):
+ return self._client.jobs().queue().execute()['items']
+
+ def _got_response(self, queue):
+ server_list = self._calculator.servers_for_queue(queue)
+ self._logger.debug("Sending server wishlist: %s",
+ ', '.join(s.name for s in server_list))
+ return super(JobQueueMonitorActor, self)._got_response(server_list)
diff --git a/services/nodemanager/arvnodeman/launcher.py b/services/nodemanager/arvnodeman/launcher.py
new file mode 100644
index 0000000..87f2dda
--- /dev/null
+++ b/services/nodemanager/arvnodeman/launcher.py
@@ -0,0 +1,130 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+import argparse
+import logging
+import signal
+import sys
+import time
+
+import daemon
+import pykka
+
+from . import config as nmconfig
+from .computenode import \
+ ComputeNodeSetupActor, ComputeNodeShutdownActor, ComputeNodeUpdateActor, \
+ ShutdownTimer
+from .daemon import NodeManagerDaemonActor
+from .jobqueue import JobQueueMonitorActor, ServerCalculator
+from .nodelist import ArvadosNodeListMonitorActor, CloudNodeListMonitorActor
+from .timedcallback import TimedCallBackActor
+
+node_daemon = None
+
+def abort(msg, code=1):
+ print("arvados-node-manager: " + msg)
+ sys.exit(code)
+
+def parse_cli(args):
+ parser = argparse.ArgumentParser(
+ prog='arvados-node-manager',
+ description="Dynamically allocate Arvados cloud compute nodes")
+ parser.add_argument(
+ '--foreground', action='store_true', default=False,
+ help="Run in the foreground. Don't daemonize.")
+ parser.add_argument(
+ '--config', help="Path to configuration file")
+ return parser.parse_args(args)
+
+def load_config(path):
+ if not path:
+ abort("No --config file specified", 2)
+ config = nmconfig.NodeManagerConfig()
+ try:
+ with open(path) as config_file:
+ config.readfp(config_file)
+ except (IOError, OSError) as error:
+ abort("Error reading configuration file {}: {}".format(path, error))
+ return config
+
+def setup_logging(path, level, **sublevels):
+ handler = logging.FileHandler(path)
+ handler.setFormatter(logging.Formatter(
+ '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
+ '%Y-%m-%d %H:%M:%S'))
+ root_logger = logging.getLogger()
+ root_logger.addHandler(handler)
+ root_logger.setLevel(level)
+ for logger_name, sublevel in sublevels.iteritems():
+ sublogger = logging.getLogger(logger_name)
+ sublogger.setLevel(sublevel)
+
+def launch_pollers(config):
+ cloud_client = config.new_cloud_client()
+ arvados_client = config.new_arvados_client()
+ cloud_size_list = config.node_sizes(cloud_client.list_sizes())
+ if not cloud_size_list:
+ abort("No valid node sizes configured")
+
+ server_calculator = ServerCalculator(
+ cloud_size_list, config.getint('Daemon', 'max_nodes'))
+ poll_time = config.getint('Daemon', 'poll_time')
+ max_poll_time = config.getint('Daemon', 'max_poll_time')
+
+ timer = TimedCallBackActor.start(poll_time / 10.0).proxy()
+ cloud_node_poller = CloudNodeListMonitorActor.start(
+ cloud_client, timer, poll_time, max_poll_time).proxy()
+ arvados_node_poller = ArvadosNodeListMonitorActor.start(
+ arvados_client, timer, poll_time, max_poll_time).proxy()
+ job_queue_poller = JobQueueMonitorActor.start(
+ config.new_arvados_client(), timer, server_calculator,
+ poll_time, max_poll_time).proxy()
+ return timer, cloud_node_poller, arvados_node_poller, job_queue_poller
+
+_caught_signals = {}
+def shutdown_signal(signal_code, frame):
+ current_count = _caught_signals.get(signal_code, 0)
+ _caught_signals[signal_code] = current_count + 1
+ if node_daemon is None:
+ pykka.ActorRegistry.stop_all()
+ sys.exit(-signal_code)
+ elif current_count == 0:
+ node_daemon.shutdown()
+ elif current_count == 1:
+ pykka.ActorRegistry.stop_all()
+ else:
+ sys.exit(-signal_code)
+
+def main(args=None):
+ global node_daemon
+ args = parse_cli(args)
+ config = load_config(args.config)
+
+ if not args.foreground:
+ daemon.DaemonContext().open()
+ for sigcode in [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]:
+ signal.signal(sigcode, shutdown_signal)
+
+ setup_logging(config.get('Logging', 'file'), **config.log_levels())
+ timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
+ launch_pollers(config)
+ cloud_node_updater = ComputeNodeUpdateActor.start(
+ config.new_cloud_client).proxy()
+ node_daemon = NodeManagerDaemonActor.start(
+ job_queue_poller, arvados_node_poller, cloud_node_poller,
+ cloud_node_updater, timer,
+ config.new_arvados_client, config.new_cloud_client,
+ config.shutdown_windows(), config.getint('Daemon', 'max_nodes'),
+ config.getint('Daemon', 'poll_stale_after'),
+ config.getint('Daemon', 'node_stale_after')).proxy()
+
+ signal.pause()
+ daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
+ while not daemon_stopped():
+ time.sleep(1)
+ pykka.ActorRegistry.stop_all()
+
+
+if __name__ == '__main__':
+ main()
diff --git a/services/nodemanager/arvnodeman/nodelist.py b/services/nodemanager/arvnodeman/nodelist.py
new file mode 100644
index 0000000..7ddfb7c
--- /dev/null
+++ b/services/nodemanager/arvnodeman/nodelist.py
@@ -0,0 +1,39 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+from . import clientactor
+from . import config
+
+class ArvadosNodeListMonitorActor(clientactor.RemotePollLoopActor):
+ """Actor to poll the Arvados node list.
+
+ This actor regularly polls the list of Arvados node records, and
+ sends it to subscribers.
+ """
+
+ CLIENT_ERRORS = config.ARVADOS_ERRORS
+ LOGGER_NAME = 'arvnodeman.arvados_nodes'
+
+ def _item_key(self, node):
+ return node['uuid']
+
+ def _send_request(self):
+ return self._client.nodes().list(limit=10000).execute()['items']
+
+
+class CloudNodeListMonitorActor(clientactor.RemotePollLoopActor):
+ """Actor to poll the cloud node list.
+
+ This actor regularly polls the cloud to get a list of running compute
+ nodes, and sends it to subscribers.
+ """
+
+ CLIENT_ERRORS = config.CLOUD_ERRORS
+ LOGGER_NAME = 'arvnodeman.cloud_nodes'
+
+ def _item_key(self, node):
+ return node.id
+
+ def _send_request(self):
+ return self._client.list_nodes()
diff --git a/services/nodemanager/arvnodeman/timedcallback.py b/services/nodemanager/arvnodeman/timedcallback.py
new file mode 100644
index 0000000..a1df8ec
--- /dev/null
+++ b/services/nodemanager/arvnodeman/timedcallback.py
@@ -0,0 +1,41 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+import heapq
+import time
+
+import pykka
+
+from .config import actor_class
+
+class TimedCallBackActor(actor_class):
+ """Send messages to other actors on a schedule.
+
+ Other actors can call the schedule() method to schedule delivery of a
+ message at a later time. This actor runs the necessary event loop for
+ delivery.
+ """
+ def __init__(self, max_sleep=1):
+ super(TimedCallBackActor, self).__init__()
+ self._proxy = self.actor_ref.proxy()
+ self.messages = []
+ self.max_sleep = max_sleep
+
+ def schedule(self, delivery_time, receiver, *args, **kwargs):
+ heapq.heappush(self.messages, (delivery_time, receiver, args, kwargs))
+ self._proxy.deliver()
+
+ def deliver(self):
+ if not self.messages:
+ return None
+ til_next = self.messages[0][0] - time.time()
+ if til_next < 0:
+ t, receiver, args, kwargs = heapq.heappop(self.messages)
+ try:
+ receiver(*args, **kwargs)
+ except pykka.ActorDeadError:
+ pass
+ else:
+ time.sleep(min(til_next, self.max_sleep))
+ self._proxy.deliver()
diff --git a/services/nodemanager/bin/arvados-node-manager b/services/nodemanager/bin/arvados-node-manager
new file mode 100644
index 0000000..3a91288
--- /dev/null
+++ b/services/nodemanager/bin/arvados-node-manager
@@ -0,0 +1,6 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+from arvnodeman.launcher import main
+main()
diff --git a/services/nodemanager/doc/ec2.example.cfg b/services/nodemanager/doc/ec2.example.cfg
new file mode 100644
index 0000000..a56e69e
--- /dev/null
+++ b/services/nodemanager/doc/ec2.example.cfg
@@ -0,0 +1,121 @@
+# EC2 configuration for Arvados Node Manager.
+# All times are in seconds unless specified otherwise.
+
+[Daemon]
+# Node Manager will not start any compute nodes when at least this
+# many are running.
+max_nodes = 8
+
+# Poll EC2 nodes and Arvados for new information every N seconds.
+poll_time = 60
+
+# Polls have exponential backoff when services fail to respond.
+# This is the longest time to wait between polls.
+max_poll_time = 300
+
+# If Node Manager can't succesfully poll a service for this long,
+# it will never start or stop compute nodes, on the assumption that its
+# information is too outdated.
+poll_stale_after = 600
+
+# "Node stale time" affects two related behaviors.
+# 1. If a compute node has been running for at least this long, but it
+# isn't paired with an Arvados node, do not shut it down, but leave it alone.
+# This prevents the node manager from shutting down a node that might
+# actually be doing work, but is having temporary trouble contacting the
+# API server.
+# 2. When the Node Manager starts a new compute node, it will try to reuse
+# an Arvados node that hasn't been updated for this long.
+node_stale_after = 14400
+
+# File path for Certificate Authorities
+certs_file = /etc/ssl/certs/ca-certificates.crt
+
+[Logging]
+# Log file path
+file = /var/log/arvados/node-manager.log
+
+# Log level for most Node Manager messages.
+# Choose one of DEBUG, INFO, WARNING, ERROR, or CRITICAL.
+# WARNING lets you know when polling a service fails.
+# INFO additionally lets you know when a compute node is started or stopped.
+level = INFO
+
+# You can also set different log levels for specific libraries.
+# Pykka is the Node Manager's actor library.
+# Setting this to DEBUG will display tracebacks for uncaught
+# exceptions in the actors, but it's also very chatty.
+pykka = WARNING
+
+# Setting apiclient to INFO will log the URL of every Arvados API request.
+apiclient = WARNING
+
+[Arvados]
+host = zyxwv.arvadosapi.com
+token = ARVADOS_TOKEN
+timeout = 15
+
+# Accept an untrusted SSL certificate from the API server?
+insecure = no
+
+[Cloud]
+provider = ec2
+
+# It's usually most cost-effective to shut down compute nodes during narrow
+# windows of time. For example, EC2 bills each node by the hour, so the best
+# time to shut down a node is right before a new hour of uptime starts.
+# Shutdown windows define these periods of time. These are windows in
+# full minutes, separated by commas. Counting from the time the node is
+# booted, the node WILL NOT shut down for N1 minutes; then it MAY shut down
+# for N2 minutes; then it WILL NOT shut down for N3 minutes; and so on.
+# For example, "54, 5, 1" means the node may shut down from the 54th to the
+# 59th minute of each hour of uptime.
+# Specify at least two windows. You can add as many as you need beyond that.
+shutdown_windows = 54, 5, 1
+
+[Cloud Credentials]
+key = KEY
+secret = SECRET_KEY
+region = us-east-1
+timeout = 60
+
+[Cloud List]
+# This section defines filters that find compute nodes.
+# Tags that you specify here will automatically be added to nodes you create.
+# Replace colons in Amazon filters with underscores
+# (e.g., write "tag:mytag" as "tag_mytag").
+instance-state-name = running
+tag_arvados-class = dynamic-compute
+tag_cluster = zyxwv
+
+[Cloud Create]
+# New compute nodes will send pings to Arvados at this host.
+# You may specify a port, and use brackets to disambiguate IPv6 addresses.
+ping_host = hostname:port
+
+# Give the name of an SSH key on AWS...
+ex_keyname = string
+
+# ... or a file path for an SSH key that can log in to the compute node.
+# (One or the other, not both.)
+# ssh_key = path
+
+# The EC2 IDs of the image and subnet compute nodes should use.
+image_id = idstring
+subnet_id = idstring
+
+# Comma-separated EC2 IDs for the security group(s) assigned to each
+# compute node.
+security_groups = idstring1, idstring2
+
+[Size t2.medium]
+# You can define any number of Size sections to list EC2 sizes you're
+# willing to use. The Node Manager should boot the cheapest size(s) that
+# can run jobs in the queue (N.B.: defining more than one size has not been
+# tested yet).
+# Each size section MUST define the number of cores it has. You may also
+# want to define the number of mebibytes of scratch space for Crunch jobs.
+# You can also override Amazon's provided data fields by setting the same
+# names here.
+cores = 2
+scratch = 100
\ No newline at end of file
diff --git a/services/nodemanager/doc/local.example.cfg b/services/nodemanager/doc/local.example.cfg
new file mode 100644
index 0000000..8a6e626
--- /dev/null
+++ b/services/nodemanager/doc/local.example.cfg
@@ -0,0 +1,41 @@
+# You can use this configuration to run a development Node Manager for
+# testing. It uses libcloud's dummy driver and your own development API server.
+# When new cloud nodes are created, you'll need to simulate the ping that
+# they send to the Arvados API server. The easiest way I've found to do that
+# is through the API server Rails console: load the Node object, set its
+# IP address to 10.10.0.N (where N is the cloud node's ID), and save.
+
+[Daemon]
+max_nodes = 8
+poll_time = 15
+max_poll_time = 60
+poll_stale_after = 600
+node_stale_after = 300
+certs_file = /etc/ssl/certs/ca-certificates.crt
+
+[Logging]
+level = DEBUG
+pykka = DEBUG
+apiclient = WARNING
+
+[Arvados]
+host = localhost:3030
+# This is the token for the text fixture's admin user.
+token = 4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h
+insecure = yes
+timeout = 15
+
+[Cloud]
+provider = dummy
+shutdown_windows = 1, 1
+timeout = 15
+
+[Cloud Credentials]
+creds = dummycreds
+
+[Cloud List]
+[Cloud Create]
+
+[Size 2]
+cores = 4
+scratch = 1234
diff --git a/services/nodemanager/setup.py b/services/nodemanager/setup.py
new file mode 100644
index 0000000..fabb883
--- /dev/null
+++ b/services/nodemanager/setup.py
@@ -0,0 +1,41 @@
+#!/usr/bin/env python
+
+import os
+import subprocess
+import time
+
+from setuptools import setup, find_packages
+
+SETUP_DIR = os.path.dirname(__file__) or "."
+cmd_opts = {'egg_info': {}}
+try:
+ git_tags = subprocess.check_output(
+ ['git', 'log', '--first-parent', '--max-count=1',
+ '--format=format:%ct %h', SETUP_DIR]).split()
+ assert len(git_tags) == 2
+except (AssertionError, OSError, subprocess.CalledProcessError):
+ pass
+else:
+ git_tags[0] = time.strftime('%Y%m%d%H%M%S', time.gmtime(int(git_tags[0])))
+ cmd_opts['egg_info']['tag_build'] = '.{}.{}'.format(*git_tags)
+
+setup(name='arvados-node-manager',
+ version='0.1',
+ description='Arvados compute node manager',
+ author='Arvados',
+ author_email='info at arvados.org',
+ url="https://arvados.org",
+ license='GNU Affero General Public License, version 3.0',
+ packages=find_packages(),
+ install_requires=[
+ 'apache-libcloud',
+ 'arvados-python-client',
+ 'pykka',
+ 'python-daemon',
+ ],
+ scripts=['bin/arvados-node-manager'],
+ test_suite='tests',
+ tests_require=['mock>=1.0'],
+ zip_safe=False,
+ options=cmd_opts,
+ )
diff --git a/services/nodemanager/tests/__init__.py b/services/nodemanager/tests/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/services/nodemanager/tests/test_clientactor.py b/services/nodemanager/tests/test_clientactor.py
new file mode 100644
index 0000000..0db0a33
--- /dev/null
+++ b/services/nodemanager/tests/test_clientactor.py
@@ -0,0 +1,127 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+import unittest
+
+import mock
+import pykka
+
+import arvnodeman.clientactor as clientactor
+from . import testutil
+
+class RemotePollLoopActorTestCase(testutil.RemotePollLoopActorTestMixin,
+ unittest.TestCase):
+ class MockClientError(Exception):
+ pass
+
+ class TestActor(clientactor.RemotePollLoopActor):
+ LOGGER_NAME = 'arvnodeman.testpoll'
+
+ def _send_request(self):
+ return self._client()
+ TestActor.CLIENT_ERRORS = (MockClientError,)
+ TEST_CLASS = TestActor
+
+
+ def build_monitor(self, side_effect, *args, **kwargs):
+ super(RemotePollLoopActorTestCase, self).build_monitor(*args, **kwargs)
+ self.client.side_effect = side_effect
+
+ def test_poll_loop_starts_after_subscription(self):
+ self.build_monitor(['test1'])
+ self.monitor.subscribe(self.subscriber)
+ self.wait_for_call(self.subscriber)
+ self.subscriber.assert_called_with('test1')
+ self.wait_for_call(self.timer.schedule)
+
+ def test_poll_loop_continues_after_failure(self):
+ self.build_monitor(self.MockClientError)
+ self.monitor.subscribe(self.subscriber)
+ self.wait_for_call(self.timer.schedule)
+ self.assertTrue(self.monitor.actor_ref.is_alive(),
+ "poll loop died after error")
+ self.assertFalse(self.subscriber.called,
+ "poll loop notified subscribers after error")
+
+ def test_late_subscribers_get_responses(self):
+ self.build_monitor(['late_test'])
+ self.monitor.subscribe(lambda response: None)
+ self.monitor.subscribe(self.subscriber)
+ self.monitor.poll()
+ self.wait_for_call(self.subscriber)
+ self.subscriber.assert_called_with('late_test')
+
+ def test_survive_dead_subscriptions(self):
+ self.build_monitor(['survive1', 'survive2'])
+ dead_subscriber = mock.Mock(name='dead_subscriber')
+ dead_subscriber.side_effect = pykka.ActorDeadError
+ self.monitor.subscribe(dead_subscriber)
+ self.wait_for_call(dead_subscriber)
+ self.monitor.subscribe(self.subscriber)
+ self.monitor.poll()
+ self.wait_for_call(self.subscriber)
+ self.subscriber.assert_called_with('survive2')
+ self.assertTrue(self.monitor.actor_ref.is_alive(),
+ "poll loop died from dead subscriber")
+
+ def test_no_subscriptions_by_key_without_support(self):
+ self.build_monitor([])
+ with self.assertRaises(AttributeError):
+ self.monitor.subscribe_to('key')
+
+
+class RemotePollLoopActorWithKeysTestCase(testutil.RemotePollLoopActorTestMixin,
+ unittest.TestCase):
+ class TestActor(RemotePollLoopActorTestCase.TestActor):
+ def _item_key(self, item):
+ return item['key']
+ TEST_CLASS = TestActor
+
+
+ def build_monitor(self, side_effect, *args, **kwargs):
+ super(RemotePollLoopActorWithKeysTestCase, self).build_monitor(
+ *args, **kwargs)
+ self.client.side_effect = side_effect
+
+ def test_key_subscription(self):
+ self.build_monitor([[{'key': 1}, {'key': 2}]])
+ self.monitor.subscribe_to(2, self.subscriber)
+ self.wait_for_call(self.subscriber)
+ self.subscriber.assert_called_with({'key': 2})
+
+ def test_survive_dead_key_subscriptions(self):
+ item = {'key': 3}
+ self.build_monitor([[item], [item]])
+ dead_subscriber = mock.Mock(name='dead_subscriber')
+ dead_subscriber.side_effect = pykka.ActorDeadError
+ self.monitor.subscribe_to(3, dead_subscriber)
+ self.wait_for_call(dead_subscriber)
+ self.monitor.subscribe_to(3, self.subscriber)
+ self.monitor.poll()
+ self.wait_for_call(self.subscriber)
+ self.subscriber.assert_called_with(item)
+ self.assertTrue(self.monitor.actor_ref.is_alive(),
+ "poll loop died from dead key subscriber")
+
+ def test_mixed_subscriptions(self):
+ item = {'key': 4}
+ self.build_monitor([[item], [item]])
+ key_subscriber = mock.Mock(name='key_subscriber')
+ self.monitor.subscribe(self.subscriber)
+ self.monitor.subscribe_to(4, key_subscriber)
+ self.monitor.poll()
+ self.wait_for_call(self.subscriber)
+ self.subscriber.assert_called_with([item])
+ key_subscriber.assert_called_with(item)
+
+ def test_subscription_to_missing_key(self):
+ self.build_monitor([[]])
+ self.monitor.subscribe_to('nonesuch', self.subscriber)
+ self.wait_for_call(self.subscriber)
+ self.subscriber.assert_called_with(None)
+
+
+if __name__ == '__main__':
+ unittest.main()
+
diff --git a/services/nodemanager/tests/test_computenode.py b/services/nodemanager/tests/test_computenode.py
new file mode 100644
index 0000000..2fc7a50
--- /dev/null
+++ b/services/nodemanager/tests/test_computenode.py
@@ -0,0 +1,272 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+import time
+import unittest
+
+import arvados.errors as arverror
+import httplib2
+import mock
+import pykka
+
+import arvnodeman.computenode as cnode
+from . import testutil
+
+class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
+ def make_mocks(self, arvados_effect=None, cloud_effect=None):
+ if arvados_effect is None:
+ arvados_effect = testutil.arvados_node_mock()
+ self.timer = testutil.MockTimer()
+ self.api_client = mock.MagicMock(name='api_client')
+ self.api_client.nodes().create().execute.side_effect = arvados_effect
+ self.api_client.nodes().update().execute.side_effect = arvados_effect
+ self.cloud_client = mock.MagicMock(name='cloud_client')
+ self.cloud_client.create_node.return_value = testutil.cloud_node_mock(1)
+
+ def make_actor(self, arv_node=None):
+ if not hasattr(self, 'timer'):
+ self.make_mocks()
+ self.setup_actor = cnode.ComputeNodeSetupActor.start(
+ self.timer, self.api_client, self.cloud_client,
+ testutil.MockSize(1), arv_node).proxy()
+
+ def test_creation_without_arvados_node(self):
+ self.make_actor()
+ self.wait_for_call(self.api_client.nodes().create().execute)
+ self.wait_for_call(self.cloud_client.create_node)
+
+ def test_creation_with_arvados_node(self):
+ arv_node = testutil.arvados_node_mock()
+ self.make_mocks([arv_node])
+ self.make_actor(arv_node)
+ self.wait_for_call(self.api_client.nodes().update().execute)
+ self.wait_for_call(self.cloud_client.create_node)
+
+ def test_failed_calls_retried(self):
+ self.make_mocks([
+ arverror.ApiError(httplib2.Response({'status': '500'}), ""),
+ testutil.arvados_node_mock(),
+ ])
+ self.make_actor()
+ self.wait_for_call(self.cloud_client.create_node)
+
+ def test_stop_when_no_cloud_node(self):
+ self.make_mocks(
+ arverror.ApiError(httplib2.Response({'status': '500'}), ""))
+ self.make_actor()
+ self.wait_for_call(self.api_client.nodes().create().execute)
+ self.setup_actor.stop_if_no_cloud_node()
+ self.assertTrue(
+ self.setup_actor.actor_ref.actor_stopped.wait(self.TIMEOUT))
+
+ def test_no_stop_when_cloud_node(self):
+ self.make_actor()
+ self.wait_for_call(self.cloud_client.create_node)
+ self.setup_actor.stop_if_no_cloud_node().get(self.TIMEOUT)
+ self.assertFalse(self.setup_actor.actor_ref.actor_stopped.is_set())
+
+ def test_subscribe(self):
+ self.make_mocks(
+ arverror.ApiError(httplib2.Response({'status': '500'}), ""))
+ self.make_actor()
+ subscriber = mock.Mock(name='subscriber_mock')
+ self.setup_actor.subscribe(subscriber)
+ self.api_client.nodes().create().execute.side_effect = [
+ testutil.arvados_node_mock()]
+ self.wait_for_call(subscriber)
+ self.assertEqual(self.setup_actor.actor_ref.actor_urn,
+ subscriber.call_args[0][0].actor_ref.actor_urn)
+
+ def test_late_subscribe(self):
+ self.make_actor()
+ subscriber = mock.Mock(name='subscriber_mock')
+ self.wait_for_call(self.cloud_client.create_node)
+ self.setup_actor.subscribe(subscriber)
+ self.wait_for_call(subscriber)
+ self.assertEqual(self.setup_actor.actor_ref.actor_urn,
+ subscriber.call_args[0][0].actor_ref.actor_urn)
+
+
+class ComputeNodeShutdownActorTestCase(testutil.ActorTestMixin,
+ unittest.TestCase):
+ def make_mocks(self, cloud_node=None):
+ self.timer = testutil.MockTimer()
+ self.cloud_client = mock.MagicMock(name='cloud_client')
+ if cloud_node is None:
+ cloud_node = testutil.cloud_node_mock()
+ self.cloud_node = cloud_node
+
+ def make_actor(self, arv_node=None):
+ if not hasattr(self, 'timer'):
+ self.make_mocks()
+ self.shutdown_actor = cnode.ComputeNodeShutdownActor.start(
+ self.timer, self.cloud_client, self.cloud_node).proxy()
+
+ def test_easy_shutdown(self):
+ self.make_actor()
+ self.wait_for_call(self.cloud_client.destroy_node)
+
+
+class ComputeNodeUpdateActorTestCase(testutil.ActorTestMixin,
+ unittest.TestCase):
+ def make_actor(self):
+ self.driver = mock.MagicMock(name='driver_mock')
+ self.updater = cnode.ComputeNodeUpdateActor.start(self.driver).proxy()
+
+ def test_node_sync(self):
+ self.make_actor()
+ cloud_node = testutil.cloud_node_mock()
+ arv_node = testutil.arvados_node_mock()
+ self.updater.sync_node(cloud_node, arv_node).get(self.TIMEOUT)
+ self.driver().sync_node.assert_called_with(cloud_node, arv_node)
+
+
+ at mock.patch('time.time', return_value=1)
+class ShutdownTimerTestCase(unittest.TestCase):
+ def test_two_length_window(self, time_mock):
+ timer = cnode.ShutdownTimer(time_mock.return_value, [8, 2])
+ self.assertEqual(481, timer.next_opening())
+ self.assertFalse(timer.window_open())
+ time_mock.return_value += 500
+ self.assertEqual(1081, timer.next_opening())
+ self.assertTrue(timer.window_open())
+ time_mock.return_value += 200
+ self.assertEqual(1081, timer.next_opening())
+ self.assertFalse(timer.window_open())
+
+ def test_three_length_window(self, time_mock):
+ timer = cnode.ShutdownTimer(time_mock.return_value, [6, 3, 1])
+ self.assertEqual(361, timer.next_opening())
+ self.assertFalse(timer.window_open())
+ time_mock.return_value += 400
+ self.assertEqual(961, timer.next_opening())
+ self.assertTrue(timer.window_open())
+ time_mock.return_value += 200
+ self.assertEqual(961, timer.next_opening())
+ self.assertFalse(timer.window_open())
+
+
+class ComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
+ unittest.TestCase):
+ class MockShutdownTimer(object):
+ def _set_state(self, is_open, next_opening):
+ self.window_open = lambda: is_open
+ self.next_opening = lambda: next_opening
+
+
+ def make_mocks(self, node_num):
+ self.shutdowns = self.MockShutdownTimer()
+ self.shutdowns._set_state(False, 300)
+ self.timer = mock.MagicMock(name='timer_mock')
+ self.updates = mock.MagicMock(name='update_mock')
+ self.cloud_mock = testutil.cloud_node_mock(node_num)
+ self.subscriber = mock.Mock(name='subscriber_mock')
+
+ def make_actor(self, node_num=1, arv_node=None, start_time=None):
+ if not hasattr(self, 'cloud_mock'):
+ self.make_mocks(node_num)
+ if start_time is None:
+ start_time = time.time()
+ start_time = time.time()
+ self.node_actor = cnode.ComputeNodeMonitorActor.start(
+ self.cloud_mock, start_time, self.shutdowns, self.timer,
+ self.updates, arv_node).proxy()
+ self.node_actor.subscribe(self.subscriber)
+
+ def test_init_shutdown_scheduling(self):
+ self.make_actor()
+ self.wait_for_call(self.timer.schedule)
+ self.assertEqual(300, self.timer.schedule.call_args[0][0])
+
+ def test_shutdown_subscription(self):
+ self.make_actor()
+ self.shutdowns._set_state(True, 600)
+ self.node_actor.consider_shutdown()
+ self.wait_for_call(self.subscriber)
+ self.assertEqual(self.node_actor.actor_ref.actor_urn,
+ self.subscriber.call_args[0][0].actor_ref.actor_urn)
+
+ def test_shutdown_without_arvados_node(self):
+ self.make_actor()
+ self.shutdowns._set_state(True, 600)
+ self.node_actor.consider_shutdown()
+ self.wait_for_call(self.subscriber)
+
+ def test_no_shutdown_without_arvados_node_and_old_cloud_node(self):
+ self.make_actor(start_time=0)
+ self.shutdowns._set_state(True, 600)
+ self.node_actor.consider_shutdown()
+ self.assertFalse(self.subscriber.called)
+
+ def check_shutdown_rescheduled(self, window_open, next_window,
+ schedule_time=None):
+ self.shutdowns._set_state(window_open, next_window)
+ self.timer.schedule.reset_mock()
+ self.node_actor.consider_shutdown()
+ self.wait_for_call(self.timer.schedule)
+ if schedule_time is not None:
+ self.assertEqual(schedule_time, self.timer.schedule.call_args[0][0])
+ self.assertFalse(self.subscriber.called)
+
+ def test_shutdown_window_close_scheduling(self):
+ self.make_actor()
+ self.check_shutdown_rescheduled(False, 600, 600)
+
+ def test_no_shutdown_when_node_running_job(self):
+ self.make_actor(4, testutil.arvados_node_mock(4, job_uuid=True))
+ self.check_shutdown_rescheduled(True, 600)
+
+ def test_no_shutdown_when_node_state_unknown(self):
+ self.make_actor(5, testutil.arvados_node_mock(5, info={}))
+ self.check_shutdown_rescheduled(True, 600)
+
+ def test_no_shutdown_when_node_state_stale(self):
+ self.make_actor(6, testutil.arvados_node_mock(6, age=900))
+ self.check_shutdown_rescheduled(True, 600)
+
+ def test_arvados_node_match(self):
+ self.make_actor(2)
+ arv_node = testutil.arvados_node_mock(
+ 2, hostname='compute-two.zzzzz.arvadosapi.com')
+ pair_future = self.node_actor.offer_arvados_pair(arv_node)
+ self.assertEqual(self.cloud_mock.id, pair_future.get(self.TIMEOUT))
+ self.wait_for_call(self.updates.sync_node)
+ self.updates.sync_node.assert_called_with(self.cloud_mock, arv_node)
+
+ def test_arvados_node_mismatch(self):
+ self.make_actor(3)
+ arv_node = testutil.arvados_node_mock(1)
+ pair_future = self.node_actor.offer_arvados_pair(arv_node)
+ self.assertIsNone(pair_future.get(self.TIMEOUT))
+
+ def test_update_cloud_node(self):
+ self.make_actor(1)
+ self.make_mocks(2)
+ self.cloud_mock.id = '1'
+ self.node_actor.update_cloud_node(self.cloud_mock)
+ current_cloud = self.node_actor.cloud_node.get(self.TIMEOUT)
+ self.assertEqual([testutil.ip_address_mock(2)],
+ current_cloud.private_ips)
+
+ def test_missing_cloud_node_update(self):
+ self.make_actor(1)
+ self.node_actor.update_cloud_node(None)
+ current_cloud = self.node_actor.cloud_node.get(self.TIMEOUT)
+ self.assertEqual([testutil.ip_address_mock(1)],
+ current_cloud.private_ips)
+
+ def test_update_arvados_node(self):
+ self.make_actor(3)
+ job_uuid = 'zzzzz-jjjjj-updatejobnode00'
+ new_arvados = testutil.arvados_node_mock(3, job_uuid)
+ self.node_actor.update_arvados_node(new_arvados)
+ current_arvados = self.node_actor.arvados_node.get(self.TIMEOUT)
+ self.assertEqual(job_uuid, current_arvados['job_uuid'])
+
+ def test_missing_arvados_node_update(self):
+ self.make_actor(4, testutil.arvados_node_mock(4))
+ self.node_actor.update_arvados_node(None)
+ current_arvados = self.node_actor.arvados_node.get(self.TIMEOUT)
+ self.assertEqual(testutil.ip_address_mock(4),
+ current_arvados['ip_address'])
diff --git a/services/nodemanager/tests/test_computenode_ec2.py b/services/nodemanager/tests/test_computenode_ec2.py
new file mode 100644
index 0000000..d1c9e43
--- /dev/null
+++ b/services/nodemanager/tests/test_computenode_ec2.py
@@ -0,0 +1,89 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+import time
+import unittest
+
+import mock
+
+import arvnodeman.computenode.ec2 as ec2
+from . import testutil
+
+class EC2ComputeNodeDriverTestCase(unittest.TestCase):
+ def setUp(self):
+ self.driver_mock = mock.MagicMock(name='driver_mock')
+
+ def new_driver(self, auth_kwargs={}, list_kwargs={}, create_kwargs={}):
+ create_kwargs.setdefault('ping_host', '100::')
+ return ec2.ComputeNodeDriver(
+ auth_kwargs, list_kwargs, create_kwargs,
+ driver_class=self.driver_mock)
+
+ def test_driver_instantiation(self):
+ kwargs = {'key': 'testkey'}
+ driver = self.new_driver(auth_kwargs=kwargs)
+ self.assertTrue(self.driver_mock.called)
+ self.assertEqual(kwargs, self.driver_mock.call_args[1])
+
+ def test_list_kwargs_become_filters(self):
+ # We're also testing tag name translation.
+ driver = self.new_driver(list_kwargs={'tag_test': 'true'})
+ driver.list_nodes()
+ list_method = self.driver_mock().list_nodes
+ self.assertTrue(list_method.called)
+ self.assertEqual({'tag:test': 'true'},
+ list_method.call_args[1].get('ex_filters'))
+
+ def test_create_location_loaded_at_initialization(self):
+ kwargs = {'location': 'testregion'}
+ driver = self.new_driver(create_kwargs=kwargs)
+ self.assertTrue(self.driver_mock().list_locations)
+
+ def test_create_image_loaded_at_initialization(self):
+ kwargs = {'image': 'testimage'}
+ driver = self.new_driver(create_kwargs=kwargs)
+ self.assertTrue(self.driver_mock().list_images)
+
+ def test_create_includes_ping_secret(self):
+ arv_node = testutil.arvados_node_mock(info={'ping_secret': 'ssshh'})
+ driver = self.new_driver()
+ driver.create_node(testutil.MockSize(1), arv_node)
+ create_method = self.driver_mock().create_node
+ self.assertTrue(create_method.called)
+ self.assertIn('ping_secret=ssshh',
+ create_method.call_args[1].get('ex_userdata',
+ 'arg missing'))
+
+ def test_tags_created_from_arvados_node(self):
+ arv_node = testutil.arvados_node_mock(8)
+ cloud_node = testutil.cloud_node_mock(8)
+ driver = self.new_driver(list_kwargs={'tag:list': 'test'})
+ self.assertEqual({'ex_metadata': {'list': 'test'},
+ 'name': 'compute8.zzzzz.arvadosapi.com'},
+ driver.arvados_create_kwargs(arv_node))
+
+ def test_tags_set_default_hostname_from_new_arvados_node(self):
+ arv_node = testutil.arvados_node_mock(hostname=None)
+ driver = self.new_driver()
+ actual = driver.arvados_create_kwargs(arv_node)
+ self.assertEqual('dynamic.compute.zzzzz.arvadosapi.com',
+ actual['name'])
+
+ def test_sync_node(self):
+ arv_node = testutil.arvados_node_mock(1)
+ cloud_node = testutil.cloud_node_mock(2)
+ driver = self.new_driver()
+ driver.sync_node(cloud_node, arv_node)
+ tag_mock = self.driver_mock().ex_create_tags
+ self.assertTrue(tag_mock.called)
+ self.assertEqual('compute1.zzzzz.arvadosapi.com',
+ tag_mock.call_args[0][1].get('Name', 'no name'))
+
+ def test_node_create_time(self):
+ refsecs = int(time.time())
+ reftuple = time.gmtime(refsecs)
+ node = testutil.cloud_node_mock()
+ node.extra = {'launch_time': time.strftime('%Y-%m-%dT%H:%M:%S.000Z',
+ reftuple)}
+ self.assertEqual(refsecs, ec2.ComputeNodeDriver.node_start_time(node))
diff --git a/services/nodemanager/tests/test_config.py b/services/nodemanager/tests/test_config.py
new file mode 100644
index 0000000..3aa9541
--- /dev/null
+++ b/services/nodemanager/tests/test_config.py
@@ -0,0 +1,65 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+import io
+import logging
+import unittest
+
+import arvnodeman.config as nmconfig
+
+class NodeManagerConfigTestCase(unittest.TestCase):
+ TEST_CONFIG = u"""
+[Cloud]
+provider = dummy
+shutdown_windows = 52, 6, 2
+
+[Cloud Credentials]
+creds = dummy_creds
+
+[Cloud List]
+[Cloud Create]
+
+[Size 1]
+cores = 1
+
+[Logging]
+file = /dev/null
+level = DEBUG
+testlogger = INFO
+"""
+
+ def load_config(self, config=None, config_str=None):
+ if config is None:
+ config = nmconfig.NodeManagerConfig()
+ if config_str is None:
+ config_str = self.TEST_CONFIG
+ with io.StringIO(config_str) as config_fp:
+ config.readfp(config_fp)
+ return config
+
+ def test_seeded_defaults(self):
+ config = nmconfig.NodeManagerConfig()
+ sec_names = set(config.sections())
+ self.assertIn('Arvados', sec_names)
+ self.assertIn('Daemon', sec_names)
+ self.assertFalse(any(name.startswith('Size ') for name in sec_names))
+
+ def test_list_sizes(self):
+ config = self.load_config()
+ client = config.new_cloud_client()
+ sizes = config.node_sizes(client.list_sizes())
+ self.assertEqual(1, len(sizes))
+ size, kwargs = sizes[0]
+ self.assertEqual('Small', size.name)
+ self.assertEqual(1, kwargs['cores'])
+
+ def test_shutdown_windows(self):
+ config = self.load_config()
+ self.assertEqual([52, 6, 2], config.shutdown_windows())
+
+ def test_log_levels(self):
+ config = self.load_config()
+ self.assertEqual({'level': logging.DEBUG,
+ 'testlogger': logging.INFO},
+ config.log_levels())
diff --git a/services/nodemanager/tests/test_daemon.py b/services/nodemanager/tests/test_daemon.py
new file mode 100644
index 0000000..176b096
--- /dev/null
+++ b/services/nodemanager/tests/test_daemon.py
@@ -0,0 +1,158 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+import time
+import unittest
+
+import mock
+
+import arvnodeman.daemon as nmdaemon
+from . import testutil
+
+class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
+ unittest.TestCase):
+ def make_daemon(self, cloud_nodes=[], arvados_nodes=[], want_sizes=[]):
+ for name in ['cloud_nodes', 'arvados_nodes', 'server_wishlist']:
+ setattr(self, name + '_poller', mock.MagicMock(name=name + '_mock'))
+ self.arv_factory = mock.MagicMock(name='arvados_mock')
+ self.cloud_factory = mock.MagicMock(name='cloud_mock')
+ self.cloud_factory().node_start_time.return_value = time.time()
+ self.cloud_updates = mock.MagicMock(name='updates_mock')
+ self.timer = testutil.MockTimer()
+ self.node_factory = mock.MagicMock(name='factory_mock')
+ self.node_setup = mock.MagicMock(name='setup_mock')
+ self.node_shutdown = mock.MagicMock(name='shutdown_mock')
+ self.daemon = nmdaemon.NodeManagerDaemonActor.start(
+ self.server_wishlist_poller, self.arvados_nodes_poller,
+ self.cloud_nodes_poller, self.cloud_updates, self.timer,
+ self.arv_factory, self.cloud_factory,
+ [54, 5, 1], 8, 600, 3600,
+ self.node_setup, self.node_shutdown, self.node_factory).proxy()
+ if cloud_nodes is not None:
+ self.daemon.update_cloud_nodes(cloud_nodes)
+ if arvados_nodes is not None:
+ self.daemon.update_arvados_nodes(arvados_nodes)
+ if want_sizes is not None:
+ self.daemon.update_server_wishlist(want_sizes)
+
+ def test_easy_node_creation(self):
+ size = testutil.MockSize(1)
+ self.make_daemon(want_sizes=[size])
+ self.wait_for_call(self.node_setup.start)
+
+ def test_node_pairing(self):
+ cloud_node = testutil.cloud_node_mock(1)
+ arv_node = testutil.arvados_node_mock(1)
+ self.make_daemon([cloud_node], [arv_node])
+ self.wait_for_call(self.node_factory.start)
+ pair_func = self.node_factory.start().proxy().offer_arvados_pair
+ self.wait_for_call(pair_func)
+ pair_func.assert_called_with(arv_node)
+
+ def test_node_pairing_after_arvados_update(self):
+ cloud_node = testutil.cloud_node_mock(2)
+ arv_node = testutil.arvados_node_mock(2, ip_address=None)
+ self.make_daemon([cloud_node], None)
+ pair_func = self.node_factory.start().proxy().offer_arvados_pair
+ pair_func().get.return_value = None
+ self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
+ pair_func.assert_called_with(arv_node)
+
+ pair_func().get.return_value = cloud_node.id
+ pair_func.reset_mock()
+ arv_node = testutil.arvados_node_mock(2)
+ self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
+ pair_func.assert_called_with(arv_node)
+
+ def test_old_arvados_node_not_double_assigned(self):
+ arv_node = testutil.arvados_node_mock(3, age=9000)
+ size = testutil.MockSize(3)
+ self.make_daemon(arvados_nodes=[arv_node], want_sizes=[size, size])
+ node_starter = self.node_setup.start
+ deadline = time.time() + self.TIMEOUT
+ while (time.time() < deadline) and (node_starter.call_count < 2):
+ time.sleep(.1)
+ self.assertEqual(2, node_starter.call_count)
+ used_nodes = [call[1].get('arvados_node')
+ for call in node_starter.call_args_list]
+ self.assertIn(arv_node, used_nodes)
+ self.assertIn(None, used_nodes)
+
+ def test_node_count_satisfied(self):
+ self.make_daemon([testutil.cloud_node_mock()])
+ self.daemon.update_server_wishlist(
+ [testutil.MockSize(1)]).get(self.TIMEOUT)
+ self.assertFalse(self.node_setup.called)
+
+ def test_booting_nodes_counted(self):
+ cloud_node = testutil.cloud_node_mock(1)
+ arv_node = testutil.arvados_node_mock(1)
+ server_wishlist = [testutil.MockSize(1)] * 2
+ self.make_daemon([cloud_node], [arv_node], server_wishlist)
+ self.wait_for_call(self.node_setup.start)
+ self.node_setup.reset_mock()
+ self.daemon.update_server_wishlist(server_wishlist).get(self.TIMEOUT)
+ self.assertFalse(self.node_setup.called)
+
+ def test_no_duplication_when_booting_node_listed_fast(self):
+ # Test that we don't start two ComputeNodeMonitorActors when
+ # we learn about a booting node through a listing before we
+ # get the "node up" message from CloudNodeSetupActor.
+ cloud_node = testutil.cloud_node_mock(1)
+ self.make_daemon(want_sizes=[testutil.MockSize(1)])
+ self.wait_for_call(self.node_setup.start)
+ setup = mock.MagicMock(name='setup_node_mock')
+ setup.actor_ref = self.node_setup.start().proxy().actor_ref
+ setup.cloud_node.get.return_value = cloud_node
+ setup.arvados_node.get.return_value = testutil.arvados_node_mock(1)
+ self.daemon.update_cloud_nodes([cloud_node])
+ self.wait_for_call(self.node_factory.start)
+ self.node_factory.reset_mock()
+ self.daemon.node_up(setup).get(self.TIMEOUT)
+ self.assertFalse(self.node_factory.start.called)
+
+ def test_booting_nodes_shut_down(self):
+ self.make_daemon(want_sizes=[testutil.MockSize(1)])
+ self.wait_for_call(self.node_setup.start)
+ self.daemon.update_server_wishlist([])
+ self.wait_for_call(
+ self.node_setup.start().proxy().stop_if_no_cloud_node)
+
+ def test_shutdown_declined_at_wishlist_capacity(self):
+ cloud_node = testutil.cloud_node_mock(1)
+ size = testutil.MockSize(1)
+ self.make_daemon(cloud_nodes=[cloud_node], want_sizes=[size])
+ node_actor = self.node_factory().proxy()
+ self.daemon.node_can_shutdown(node_actor).get(self.TIMEOUT)
+ self.assertFalse(node_actor.shutdown.called)
+
+ def test_shutdown_accepted_below_capacity(self):
+ self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
+ node_actor = self.node_factory().proxy()
+ self.daemon.node_can_shutdown(node_actor)
+ self.wait_for_call(self.node_shutdown.start)
+
+ def test_clean_shutdown_waits_for_node_setup_finish(self):
+ self.make_daemon(want_sizes=[testutil.MockSize(1)])
+ self.wait_for_call(self.node_setup.start)
+ new_node = self.node_setup.start().proxy()
+ self.daemon.shutdown()
+ self.wait_for_call(new_node.stop_if_no_cloud_node)
+ self.daemon.node_up(new_node)
+ self.wait_for_call(new_node.stop)
+ self.assertTrue(
+ self.daemon.actor_ref.actor_stopped.wait(self.TIMEOUT))
+
+ def test_wishlist_ignored_after_shutdown(self):
+ size = testutil.MockSize(2)
+ self.make_daemon(want_sizes=[size])
+ node_starter = self.node_setup.start
+ self.wait_for_call(node_starter)
+ node_starter.reset_mock()
+ self.daemon.shutdown()
+ self.daemon.update_server_wishlist([size] * 2).get(self.TIMEOUT)
+ # Send another message and wait for a response, to make sure all
+ # internal messages generated by the wishlist update are processed.
+ self.daemon.update_server_wishlist([size] * 2).get(self.TIMEOUT)
+ self.assertFalse(node_starter.called)
diff --git a/services/nodemanager/tests/test_jobqueue.py b/services/nodemanager/tests/test_jobqueue.py
new file mode 100644
index 0000000..3814ba4
--- /dev/null
+++ b/services/nodemanager/tests/test_jobqueue.py
@@ -0,0 +1,74 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+import unittest
+
+import arvnodeman.jobqueue as jobqueue
+from . import testutil
+
+class ServerCalculatorTestCase(unittest.TestCase):
+ def make_calculator(self, factors, **kwargs):
+ return jobqueue.ServerCalculator(
+ [(testutil.MockSize(n), {'cores': n}) for n in factors], **kwargs)
+
+ def calculate(self, servcalc, *constraints):
+ return servcalc.servers_for_queue(
+ [{'runtime_constraints': cdict} for cdict in constraints])
+
+ def test_empty_queue_needs_no_servers(self):
+ servcalc = self.make_calculator([1])
+ self.assertEqual([], servcalc.servers_for_queue([]))
+
+ def test_easy_server_count(self):
+ servcalc = self.make_calculator([1])
+ servlist = self.calculate(servcalc, {'min_nodes': 3})
+ self.assertEqual(3, len(servlist))
+
+ def test_implicit_server_count(self):
+ servcalc = self.make_calculator([1])
+ servlist = self.calculate(servcalc, {}, {'min_nodes': 3})
+ self.assertEqual(4, len(servlist))
+
+ def test_bad_min_nodes_override(self):
+ servcalc = self.make_calculator([1])
+ servlist = self.calculate(servcalc,
+ {'min_nodes': -2}, {'min_nodes': 'foo'})
+ self.assertEqual(2, len(servlist))
+
+ def test_ignore_unsatisfiable_jobs(self):
+ servcalc = self.make_calculator([1], max_nodes=9)
+ servlist = self.calculate(servcalc,
+ {'min_cores_per_node': 2},
+ {'min_ram_mb_per_node': 256},
+ {'min_nodes': 6},
+ {'min_nodes': 12},
+ {'min_scratch_mb_per_node': 200})
+ self.assertEqual(6, len(servlist))
+
+
+class JobQueueMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
+ unittest.TestCase):
+ TEST_CLASS = jobqueue.JobQueueMonitorActor
+
+ class MockCalculator(object):
+ @staticmethod
+ def servers_for_queue(queue):
+ return [testutil.MockSize(n) for n in queue]
+
+
+ def build_monitor(self, side_effect, *args, **kwargs):
+ super(JobQueueMonitorActorTestCase, self).build_monitor(*args, **kwargs)
+ self.client.jobs().queue().execute.side_effect = side_effect
+
+ def test_subscribers_get_server_lists(self):
+ self.build_monitor([{'items': [1, 2]}], self.MockCalculator())
+ self.monitor.subscribe(self.subscriber)
+ self.wait_for_call(self.subscriber)
+ self.subscriber.assert_called_with([testutil.MockSize(1),
+ testutil.MockSize(2)])
+
+
+if __name__ == '__main__':
+ unittest.main()
+
diff --git a/services/nodemanager/tests/test_nodelist.py b/services/nodemanager/tests/test_nodelist.py
new file mode 100644
index 0000000..d9f47e2
--- /dev/null
+++ b/services/nodemanager/tests/test_nodelist.py
@@ -0,0 +1,56 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+import unittest
+
+import arvnodeman.nodelist as nodelist
+from . import testutil
+
+class ArvadosNodeListMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
+ unittest.TestCase):
+ TEST_CLASS = nodelist.ArvadosNodeListMonitorActor
+
+ def build_monitor(self, side_effect, *args, **kwargs):
+ super(ArvadosNodeListMonitorActorTestCase, self).build_monitor(
+ *args, **kwargs)
+ self.client.nodes().list().execute.side_effect = side_effect
+
+ def test_uuid_is_subscription_key(self):
+ node = testutil.arvados_node_mock()
+ self.build_monitor([{'items': [node]}])
+ self.monitor.subscribe_to(node['uuid'], self.subscriber)
+ self.wait_for_call(self.subscriber)
+ self.subscriber.assert_called_with(node)
+
+
+class CloudNodeListMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
+ unittest.TestCase):
+ TEST_CLASS = nodelist.CloudNodeListMonitorActor
+
+ class MockNode(object):
+ def __init__(self, count):
+ self.id = str(count)
+ self.name = 'test{}.example.com'.format(count)
+ self.private_ips = ['10.0.0.{}'.format(count)]
+ self.public_ips = []
+ self.size = None
+ self.state = 0
+
+
+ def build_monitor(self, side_effect, *args, **kwargs):
+ super(CloudNodeListMonitorActorTestCase, self).build_monitor(
+ *args, **kwargs)
+ self.client.list_nodes.side_effect = side_effect
+
+ def test_id_is_subscription_key(self):
+ node = self.MockNode(1)
+ self.build_monitor([[node]])
+ self.monitor.subscribe_to('1', self.subscriber)
+ self.wait_for_call(self.subscriber)
+ self.subscriber.assert_called_with(node)
+
+
+if __name__ == '__main__':
+ unittest.main()
+
diff --git a/services/nodemanager/tests/test_timedcallback.py b/services/nodemanager/tests/test_timedcallback.py
new file mode 100644
index 0000000..60f7b81
--- /dev/null
+++ b/services/nodemanager/tests/test_timedcallback.py
@@ -0,0 +1,55 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+import time
+import unittest
+
+import mock
+import pykka
+
+import arvnodeman.timedcallback as timedcallback
+from . import testutil
+
+ at testutil.no_sleep
+class TimedCallBackActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
+ def test_immediate_turnaround(self):
+ future = self.FUTURE_CLASS()
+ deliverer = timedcallback.TimedCallBackActor.start().proxy()
+ deliverer.schedule(time.time() - 1, future.set, 'immediate')
+ self.assertEqual('immediate', future.get(self.TIMEOUT))
+
+ def test_delayed_turnaround(self):
+ future = self.FUTURE_CLASS()
+ with mock.patch('time.time', return_value=0) as mock_now:
+ deliverer = timedcallback.TimedCallBackActor.start().proxy()
+ deliverer.schedule(1, future.set, 'delayed')
+ self.assertRaises(pykka.Timeout, future.get, .5)
+ mock_now.return_value = 2
+ self.assertEqual('delayed', future.get(self.TIMEOUT))
+
+ def test_out_of_order_scheduling(self):
+ future1 = self.FUTURE_CLASS()
+ future2 = self.FUTURE_CLASS()
+ with mock.patch('time.time', return_value=1.5) as mock_now:
+ deliverer = timedcallback.TimedCallBackActor.start().proxy()
+ deliverer.schedule(2, future2.set, 'second')
+ deliverer.schedule(1, future1.set, 'first')
+ self.assertEqual('first', future1.get(self.TIMEOUT))
+ self.assertRaises(pykka.Timeout, future2.get, .1)
+ mock_now.return_value = 3
+ self.assertEqual('second', future2.get(self.TIMEOUT))
+
+ def test_dead_actors_ignored(self):
+ receiver = mock.Mock(name='dead_actor', spec=pykka.ActorRef)
+ receiver.tell.side_effect = pykka.ActorDeadError
+ deliverer = timedcallback.TimedCallBackActor.start().proxy()
+ deliverer.schedule(time.time() - 1, receiver.tell, 'error')
+ self.wait_for_call(receiver.tell)
+ receiver.tell.assert_called_with('error')
+ self.assertTrue(deliverer.actor_ref.is_alive(), "deliverer died")
+
+
+if __name__ == '__main__':
+ unittest.main()
+
diff --git a/services/nodemanager/tests/testutil.py b/services/nodemanager/tests/testutil.py
new file mode 100644
index 0000000..a33f76f
--- /dev/null
+++ b/services/nodemanager/tests/testutil.py
@@ -0,0 +1,81 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+import time
+
+import mock
+import pykka
+
+no_sleep = mock.patch('time.sleep', lambda n: None)
+
+def arvados_node_mock(node_num=99, job_uuid=None, age=0, **kwargs):
+ if job_uuid is True:
+ job_uuid = 'zzzzz-jjjjj-jobjobjobjobjob'
+ slurm_state = 'idle' if (job_uuid is None) else 'alloc'
+ node = {'uuid': 'zzzzz-yyyyy-12345abcde67890',
+ 'created_at': '2014-01-01T01:02:03Z',
+ 'modified_at': time.strftime('%Y-%m-%dT%H:%M:%SZ',
+ time.gmtime(time.time() - age)),
+ 'hostname': 'compute{}'.format(node_num),
+ 'domain': 'zzzzz.arvadosapi.com',
+ 'ip_address': ip_address_mock(node_num),
+ 'job_uuid': job_uuid,
+ 'info': {'slurm_state': slurm_state}}
+ node.update(kwargs)
+ return node
+
+def cloud_node_mock(node_num=99):
+ node = mock.NonCallableMagicMock(
+ ['id', 'name', 'state', 'public_ips', 'private_ips', 'driver', 'size',
+ 'image', 'extra'],
+ name='cloud_node')
+ node.id = str(node_num)
+ node.name = node.id
+ node.public_ips = []
+ node.private_ips = [ip_address_mock(node_num)]
+ return node
+
+def ip_address_mock(last_octet):
+ return '10.20.30.{}'.format(last_octet)
+
+class MockSize(object):
+ def __init__(self, factor):
+ self.id = 'z{}.test'.format(factor)
+ self.name = self.id
+ self.ram = 128 * factor
+ self.disk = 100 * factor
+ self.bandwidth = 16 * factor
+ self.price = float(factor)
+ self.extra = {}
+
+ def __eq__(self, other):
+ return self.id == other.id
+
+
+class MockTimer(object):
+ def schedule(self, want_time, callback, *args, **kwargs):
+ return callback(*args, **kwargs)
+
+
+class ActorTestMixin(object):
+ FUTURE_CLASS = pykka.ThreadingFuture
+ TIMEOUT = 5
+
+ def tearDown(self):
+ pykka.ActorRegistry.stop_all()
+
+ def wait_for_call(self, mock_func, timeout=TIMEOUT):
+ deadline = time.time() + timeout
+ while (not mock_func.called) and (time.time() < deadline):
+ time.sleep(.1)
+ self.assertTrue(mock_func.called, "{} not called".format(mock_func))
+
+
+class RemotePollLoopActorTestMixin(ActorTestMixin):
+ def build_monitor(self, *args, **kwargs):
+ self.timer = mock.MagicMock(name='timer_mock')
+ self.client = mock.MagicMock(name='client_mock')
+ self.subscriber = mock.Mock(name='subscriber_mock')
+ self.monitor = self.TEST_CLASS.start(
+ self.client, self.timer, *args, **kwargs).proxy()
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list