[ARVADOS] updated: 8f9f0dece977ccf5a778f3b3bd2379375e723c18
git at public.curoverse.com
git at public.curoverse.com
Wed Nov 12 13:59:50 EST 2014
Summary of changes:
.../nodemanager/arvnodeman/computenode/__init__.py | 353 ---------------------
.../computenode/{ => dispatch}/__init__.py | 123 +------
.../arvnodeman/computenode/driver/__init__.py | 64 ++++
.../arvnodeman/computenode/{ => driver}/dummy.py | 3 +-
.../arvnodeman/computenode/{ => driver}/ec2.py | 3 +-
services/nodemanager/arvnodeman/config.py | 2 +-
services/nodemanager/arvnodeman/daemon.py | 7 +-
services/nodemanager/arvnodeman/launcher.py | 4 +-
services/nodemanager/tests/test_computenode.py | 279 ----------------
...computenode.py => test_computenode_dispatch.py} | 35 +-
...enode_ec2.py => test_computenode_driver_ec2.py} | 2 +-
services/nodemanager/tests/test_daemon.py | 4 +-
12 files changed, 85 insertions(+), 794 deletions(-)
copy services/nodemanager/arvnodeman/computenode/{ => dispatch}/__init__.py (69%)
create mode 100644 services/nodemanager/arvnodeman/computenode/driver/__init__.py
rename services/nodemanager/arvnodeman/computenode/{ => driver}/dummy.py (96%)
rename services/nodemanager/arvnodeman/computenode/{ => driver}/ec2.py (98%)
copy services/nodemanager/tests/{test_computenode.py => test_computenode_dispatch.py} (89%)
rename services/nodemanager/tests/{test_computenode_ec2.py => test_computenode_driver_ec2.py} (98%)
via 8f9f0dece977ccf5a778f3b3bd2379375e723c18 (commit)
via b626a85eb86fd4909712852040cd305c71c37ee5 (commit)
from 5141c3ee23e89696773e227a93236ef2a51543c2 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 8f9f0dece977ccf5a778f3b3bd2379375e723c18
Merge: 5141c3e b626a85
Author: Brett Smith <brett at curoverse.com>
Date: Wed Nov 12 13:59:14 2014 -0500
Merge branch '4380-node-manager-computenode-reorg-wip'
Refs #4380. Closes #4496.
commit b626a85eb86fd4909712852040cd305c71c37ee5
Author: Brett Smith <brett at curoverse.com>
Date: Tue Nov 11 17:23:14 2014 -0500
4380: Reorganize arvnodeman.computenode.
This makes the hierarchy a little richer:
* arvnodeman.computenode.driver has all the cloud driver wrappers.
* arvnodeman.computenode.dispatch will be just like that, except it
will consider local dispatch concerns. For example, I'm going to
add a SLURM submodule here to take care of draining.
* arvnodeman.computenode still has utility functions and
ShutdownTimer.
diff --git a/services/nodemanager/arvnodeman/computenode/__init__.py b/services/nodemanager/arvnodeman/computenode/__init__.py
index 63effe9..4955992 100644
--- a/services/nodemanager/arvnodeman/computenode/__init__.py
+++ b/services/nodemanager/arvnodeman/computenode/__init__.py
@@ -2,16 +2,9 @@
from __future__ import absolute_import, print_function
-import functools
import itertools
-import logging
import time
-import pykka
-
-from ..clientactor import _notify_subscribers
-from .. import config
-
def arvados_node_fqdn(arvados_node, default_hostname='dynamic.compute'):
hostname = arvados_node.get('hostname') or default_hostname
return '{}.{}'.format(hostname, arvados_node['domain'])
@@ -23,252 +16,6 @@ def arvados_node_mtime(node):
def timestamp_fresh(timestamp, fresh_time):
return (time.time() - timestamp) < fresh_time
-class BaseComputeNodeDriver(object):
- """Abstract base class for compute node drivers.
-
- libcloud abstracts away many of the differences between cloud providers,
- but managing compute nodes requires some cloud-specific features (e.g.,
- on EC2 we use tags to identify compute nodes). Compute node drivers
- are responsible for translating the node manager's cloud requests to a
- specific cloud's vocabulary.
-
- Subclasses must implement arvados_create_kwargs (to update node
- creation kwargs with information about the specific Arvados node
- record), sync_node, and node_start_time.
- """
- def __init__(self, auth_kwargs, list_kwargs, create_kwargs, driver_class):
- self.real = driver_class(**auth_kwargs)
- self.list_kwargs = list_kwargs
- self.create_kwargs = create_kwargs
-
- def __getattr__(self, name):
- # Proxy non-extension methods to the real driver.
- if (not name.startswith('_') and not name.startswith('ex_')
- and hasattr(self.real, name)):
- return getattr(self.real, name)
- else:
- return super(BaseComputeNodeDriver, self).__getattr__(name)
-
- def search_for(self, term, list_method, key=lambda item: item.id):
- cache_key = (list_method, term)
- if cache_key not in self.SEARCH_CACHE:
- results = [item for item in getattr(self.real, list_method)()
- if key(item) == term]
- count = len(results)
- if count != 1:
- raise ValueError("{} returned {} results for '{}'".format(
- list_method, count, term))
- self.SEARCH_CACHE[cache_key] = results[0]
- return self.SEARCH_CACHE[cache_key]
-
- def list_nodes(self):
- return self.real.list_nodes(**self.list_kwargs)
-
- def arvados_create_kwargs(self, arvados_node):
- raise NotImplementedError("BaseComputeNodeDriver.arvados_create_kwargs")
-
- def create_node(self, size, arvados_node):
- kwargs = self.create_kwargs.copy()
- kwargs.update(self.arvados_create_kwargs(arvados_node))
- kwargs['size'] = size
- return self.real.create_node(**kwargs)
-
- def sync_node(self, cloud_node, arvados_node):
- # When a compute node first pings the API server, the API server
- # will automatically assign some attributes on the corresponding
- # node record, like hostname. This method should propagate that
- # information back to the cloud node appropriately.
- raise NotImplementedError("BaseComputeNodeDriver.sync_node")
-
- @classmethod
- def node_start_time(cls, node):
- raise NotImplementedError("BaseComputeNodeDriver.node_start_time")
-
-
-ComputeNodeDriverClass = BaseComputeNodeDriver
-
-class ComputeNodeStateChangeBase(config.actor_class):
- """Base class for actors that change a compute node's state.
-
- This base class takes care of retrying changes and notifying
- subscribers when the change is finished.
- """
- def __init__(self, logger_name, timer_actor, retry_wait, max_retry_wait):
- super(ComputeNodeStateChangeBase, self).__init__()
- self._later = self.actor_ref.proxy()
- self._timer = timer_actor
- self._logger = logging.getLogger(logger_name)
- self.min_retry_wait = retry_wait
- self.max_retry_wait = max_retry_wait
- self.retry_wait = retry_wait
- self.subscribers = set()
-
- @staticmethod
- def _retry(errors):
- """Retry decorator for an actor method that makes remote requests.
-
- Use this function to decorator an actor method, and pass in a
- tuple of exceptions to catch. This decorator will schedule
- retries of that method with exponential backoff if the
- original method raises any of the given errors.
- """
- def decorator(orig_func):
- @functools.wraps(orig_func)
- def wrapper(self, *args, **kwargs):
- try:
- orig_func(self, *args, **kwargs)
- except errors as error:
- self._logger.warning(
- "Client error: %s - waiting %s seconds",
- error, self.retry_wait)
- self._timer.schedule(self.retry_wait,
- getattr(self._later,
- orig_func.__name__),
- *args, **kwargs)
- self.retry_wait = min(self.retry_wait * 2,
- self.max_retry_wait)
- else:
- self.retry_wait = self.min_retry_wait
- return wrapper
- return decorator
-
- def _finished(self):
- _notify_subscribers(self._later, self.subscribers)
- self.subscribers = None
-
- def subscribe(self, subscriber):
- if self.subscribers is None:
- try:
- subscriber(self._later)
- except pykka.ActorDeadError:
- pass
- else:
- self.subscribers.add(subscriber)
-
-
-class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
- """Actor to create and set up a cloud compute node.
-
- This actor prepares an Arvados node record for a new compute node
- (either creating one or cleaning one passed in), then boots the
- actual compute node. It notifies subscribers when the cloud node
- is successfully created (the last step in the process for Node
- Manager to handle).
- """
- def __init__(self, timer_actor, arvados_client, cloud_client,
- cloud_size, arvados_node=None,
- retry_wait=1, max_retry_wait=180):
- super(ComputeNodeSetupActor, self).__init__(
- 'arvnodeman.nodeup', timer_actor, retry_wait, max_retry_wait)
- self._arvados = arvados_client
- self._cloud = cloud_client
- self.cloud_size = cloud_size
- self.arvados_node = None
- self.cloud_node = None
- if arvados_node is None:
- self._later.create_arvados_node()
- else:
- self._later.prepare_arvados_node(arvados_node)
-
- @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
- def create_arvados_node(self):
- self.arvados_node = self._arvados.nodes().create(body={}).execute()
- self._later.create_cloud_node()
-
- @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
- def prepare_arvados_node(self, node):
- self.arvados_node = self._arvados.nodes().update(
- uuid=node['uuid'],
- body={'hostname': None,
- 'ip_address': None,
- 'slot_number': None,
- 'first_ping_at': None,
- 'last_ping_at': None,
- 'info': {'ec2_instance_id': None,
- 'last_action': "Prepared by Node Manager"}}
- ).execute()
- self._later.create_cloud_node()
-
- @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
- def create_cloud_node(self):
- self._logger.info("Creating cloud node with size %s.",
- self.cloud_size.name)
- self.cloud_node = self._cloud.create_node(self.cloud_size,
- self.arvados_node)
- self._logger.info("Cloud node %s created.", self.cloud_node.id)
- self._finished()
-
- def stop_if_no_cloud_node(self):
- if self.cloud_node is None:
- self.stop()
-
-
-class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
- """Actor to shut down a compute node.
-
- This actor simply destroys a cloud node, retrying as needed.
- """
- def __init__(self, timer_actor, cloud_client, cloud_node,
- retry_wait=1, max_retry_wait=180):
- super(ComputeNodeShutdownActor, self).__init__(
- 'arvnodeman.nodedown', timer_actor, retry_wait, max_retry_wait)
- self._cloud = cloud_client
- self.cloud_node = cloud_node
- self._later.shutdown_node()
-
- @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
- def shutdown_node(self):
- self._cloud.destroy_node(self.cloud_node)
- self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
- self._finished()
-
-
-class ComputeNodeUpdateActor(config.actor_class):
- """Actor to dispatch one-off cloud management requests.
-
- This actor receives requests for small cloud updates, and
- dispatches them to a real driver. ComputeNodeMonitorActors use
- this to perform maintenance tasks on themselves. Having a
- dedicated actor for this gives us the opportunity to control the
- flow of requests; e.g., by backing off when errors occur.
-
- This actor is most like a "traditional" Pykka actor: there's no
- subscribing, but instead methods return real driver results. If
- you're interested in those results, you should get them from the
- Future that the proxy method returns. Be prepared to handle exceptions
- from the cloud driver when you do.
- """
- def __init__(self, cloud_factory, max_retry_wait=180):
- super(ComputeNodeUpdateActor, self).__init__()
- self._cloud = cloud_factory()
- self.max_retry_wait = max_retry_wait
- self.error_streak = 0
- self.next_request_time = time.time()
-
- def _throttle_errors(orig_func):
- @functools.wraps(orig_func)
- def wrapper(self, *args, **kwargs):
- throttle_time = self.next_request_time - time.time()
- if throttle_time > 0:
- time.sleep(throttle_time)
- self.next_request_time = time.time()
- try:
- result = orig_func(self, *args, **kwargs)
- except config.CLOUD_ERRORS:
- self.error_streak += 1
- self.next_request_time += min(2 ** self.error_streak,
- self.max_retry_wait)
- raise
- else:
- self.error_streak = 0
- return result
- return wrapper
-
- @_throttle_errors
- def sync_node(self, cloud_node, arvados_node):
- return self._cloud.sync_node(cloud_node, arvados_node)
-
-
class ShutdownTimer(object):
"""Keep track of a cloud node's shutdown windows.
@@ -309,103 +56,3 @@ class ShutdownTimer(object):
def window_open(self):
self._advance_opening()
return 0 < (time.time() - self._open_start) < self._open_for
-
-
-class ComputeNodeMonitorActor(config.actor_class):
- """Actor to manage a running compute node.
-
- This actor gets updates about a compute node's cloud and Arvados records.
- It uses this information to notify subscribers when the node is eligible
- for shutdown.
- """
- def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
- timer_actor, update_actor, arvados_node=None,
- poll_stale_after=600, node_stale_after=3600):
- super(ComputeNodeMonitorActor, self).__init__()
- self._later = self.actor_ref.proxy()
- self._logger = logging.getLogger('arvnodeman.computenode')
- self._last_log = None
- self._shutdowns = shutdown_timer
- self._timer = timer_actor
- self._update = update_actor
- self.cloud_node = cloud_node
- self.cloud_node_start_time = cloud_node_start_time
- self.poll_stale_after = poll_stale_after
- self.node_stale_after = node_stale_after
- self.subscribers = set()
- self.arvados_node = None
- self._later.update_arvados_node(arvados_node)
- self.last_shutdown_opening = None
- self._later.consider_shutdown()
-
- def subscribe(self, subscriber):
- self.subscribers.add(subscriber)
-
- def _debug(self, msg, *args):
- if msg == self._last_log:
- return
- self._last_log = msg
- self._logger.debug(msg, *args)
-
- def in_state(self, *states):
- # Return a boolean to say whether or not our Arvados node record is in
- # one of the given states. If state information is not
- # available--because this node has no Arvados record, the record is
- # stale, or the record has no state information--return None.
- if (self.arvados_node is None) or not timestamp_fresh(
- arvados_node_mtime(self.arvados_node), self.node_stale_after):
- return None
- state = self.arvados_node['info'].get('slurm_state')
- if not state:
- return None
- result = state in states
- if state == 'idle':
- result = result and not self.arvados_node['job_uuid']
- return result
-
- def _shutdown_eligible(self):
- if self.arvados_node is None:
- # If this is a new, unpaired node, it's eligible for
- # shutdown--we figure there was an error during bootstrap.
- return timestamp_fresh(self.cloud_node_start_time,
- self.node_stale_after)
- else:
- return self.in_state('idle')
-
- def consider_shutdown(self):
- next_opening = self._shutdowns.next_opening()
- if self._shutdowns.window_open():
- if self._shutdown_eligible():
- self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
- _notify_subscribers(self._later, self.subscribers)
- else:
- self._debug("Node %s shutdown window open but node busy.",
- self.cloud_node.id)
- else:
- self._debug("Node %s shutdown window closed. Next at %s.",
- self.cloud_node.id, time.ctime(next_opening))
- if self.last_shutdown_opening != next_opening:
- self._timer.schedule(next_opening, self._later.consider_shutdown)
- self.last_shutdown_opening = next_opening
-
- def offer_arvados_pair(self, arvados_node):
- if self.arvados_node is not None:
- return None
- elif arvados_node['ip_address'] in self.cloud_node.private_ips:
- self._later.update_arvados_node(arvados_node)
- return self.cloud_node.id
- else:
- return None
-
- def update_cloud_node(self, cloud_node):
- if cloud_node is not None:
- self.cloud_node = cloud_node
- self._later.consider_shutdown()
-
- def update_arvados_node(self, arvados_node):
- if arvados_node is not None:
- self.arvados_node = arvados_node
- new_hostname = arvados_node_fqdn(self.arvados_node)
- if new_hostname != self.cloud_node.name:
- self._update.sync_node(self.cloud_node, self.arvados_node)
- self._later.consider_shutdown()
diff --git a/services/nodemanager/arvnodeman/computenode/__init__.py b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
similarity index 69%
copy from services/nodemanager/arvnodeman/computenode/__init__.py
copy to services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
index 63effe9..d613ef1 100644
--- a/services/nodemanager/arvnodeman/computenode/__init__.py
+++ b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
@@ -3,89 +3,14 @@
from __future__ import absolute_import, print_function
import functools
-import itertools
import logging
import time
import pykka
-from ..clientactor import _notify_subscribers
-from .. import config
-
-def arvados_node_fqdn(arvados_node, default_hostname='dynamic.compute'):
- hostname = arvados_node.get('hostname') or default_hostname
- return '{}.{}'.format(hostname, arvados_node['domain'])
-
-def arvados_node_mtime(node):
- return time.mktime(time.strptime(node['modified_at'] + 'UTC',
- '%Y-%m-%dT%H:%M:%SZ%Z')) - time.timezone
-
-def timestamp_fresh(timestamp, fresh_time):
- return (time.time() - timestamp) < fresh_time
-
-class BaseComputeNodeDriver(object):
- """Abstract base class for compute node drivers.
-
- libcloud abstracts away many of the differences between cloud providers,
- but managing compute nodes requires some cloud-specific features (e.g.,
- on EC2 we use tags to identify compute nodes). Compute node drivers
- are responsible for translating the node manager's cloud requests to a
- specific cloud's vocabulary.
-
- Subclasses must implement arvados_create_kwargs (to update node
- creation kwargs with information about the specific Arvados node
- record), sync_node, and node_start_time.
- """
- def __init__(self, auth_kwargs, list_kwargs, create_kwargs, driver_class):
- self.real = driver_class(**auth_kwargs)
- self.list_kwargs = list_kwargs
- self.create_kwargs = create_kwargs
-
- def __getattr__(self, name):
- # Proxy non-extension methods to the real driver.
- if (not name.startswith('_') and not name.startswith('ex_')
- and hasattr(self.real, name)):
- return getattr(self.real, name)
- else:
- return super(BaseComputeNodeDriver, self).__getattr__(name)
-
- def search_for(self, term, list_method, key=lambda item: item.id):
- cache_key = (list_method, term)
- if cache_key not in self.SEARCH_CACHE:
- results = [item for item in getattr(self.real, list_method)()
- if key(item) == term]
- count = len(results)
- if count != 1:
- raise ValueError("{} returned {} results for '{}'".format(
- list_method, count, term))
- self.SEARCH_CACHE[cache_key] = results[0]
- return self.SEARCH_CACHE[cache_key]
-
- def list_nodes(self):
- return self.real.list_nodes(**self.list_kwargs)
-
- def arvados_create_kwargs(self, arvados_node):
- raise NotImplementedError("BaseComputeNodeDriver.arvados_create_kwargs")
-
- def create_node(self, size, arvados_node):
- kwargs = self.create_kwargs.copy()
- kwargs.update(self.arvados_create_kwargs(arvados_node))
- kwargs['size'] = size
- return self.real.create_node(**kwargs)
-
- def sync_node(self, cloud_node, arvados_node):
- # When a compute node first pings the API server, the API server
- # will automatically assign some attributes on the corresponding
- # node record, like hostname. This method should propagate that
- # information back to the cloud node appropriately.
- raise NotImplementedError("BaseComputeNodeDriver.sync_node")
-
- @classmethod
- def node_start_time(cls, node):
- raise NotImplementedError("BaseComputeNodeDriver.node_start_time")
-
-
-ComputeNodeDriverClass = BaseComputeNodeDriver
+from .. import arvados_node_fqdn, arvados_node_mtime, timestamp_fresh
+from ...clientactor import _notify_subscribers
+from ... import config
class ComputeNodeStateChangeBase(config.actor_class):
"""Base class for actors that change a compute node's state.
@@ -269,48 +194,6 @@ class ComputeNodeUpdateActor(config.actor_class):
return self._cloud.sync_node(cloud_node, arvados_node)
-class ShutdownTimer(object):
- """Keep track of a cloud node's shutdown windows.
-
- Instantiate this class with a timestamp of when a cloud node started,
- and a list of durations (in minutes) of when the node must not and may
- be shut down, alternating. The class will tell you when a shutdown
- window is open, and when the next open window will start.
- """
- def __init__(self, start_time, shutdown_windows):
- # The implementation is easiest if we have an even number of windows,
- # because then windows always alternate between open and closed.
- # Rig that up: calculate the first shutdown window based on what's
- # passed in. Then, if we were given an odd number of windows, merge
- # that first window into the last one, since they both# represent
- # closed state.
- first_window = shutdown_windows[0]
- shutdown_windows = list(shutdown_windows[1:])
- self._next_opening = start_time + (60 * first_window)
- if len(shutdown_windows) % 2:
- shutdown_windows.append(first_window)
- else:
- shutdown_windows[-1] += first_window
- self.shutdown_windows = itertools.cycle([60 * n
- for n in shutdown_windows])
- self._open_start = self._next_opening
- self._open_for = next(self.shutdown_windows)
-
- def _advance_opening(self):
- while self._next_opening < time.time():
- self._open_start = self._next_opening
- self._next_opening += self._open_for + next(self.shutdown_windows)
- self._open_for = next(self.shutdown_windows)
-
- def next_opening(self):
- self._advance_opening()
- return self._next_opening
-
- def window_open(self):
- self._advance_opening()
- return 0 < (time.time() - self._open_start) < self._open_for
-
-
class ComputeNodeMonitorActor(config.actor_class):
"""Actor to manage a running compute node.
diff --git a/services/nodemanager/arvnodeman/computenode/driver/__init__.py b/services/nodemanager/arvnodeman/computenode/driver/__init__.py
new file mode 100644
index 0000000..a20cfde
--- /dev/null
+++ b/services/nodemanager/arvnodeman/computenode/driver/__init__.py
@@ -0,0 +1,64 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+class BaseComputeNodeDriver(object):
+ """Abstract base class for compute node drivers.
+
+ libcloud abstracts away many of the differences between cloud providers,
+ but managing compute nodes requires some cloud-specific features (e.g.,
+ on EC2 we use tags to identify compute nodes). Compute node drivers
+ are responsible for translating the node manager's cloud requests to a
+ specific cloud's vocabulary.
+
+ Subclasses must implement arvados_create_kwargs (to update node
+ creation kwargs with information about the specific Arvados node
+ record), sync_node, and node_start_time.
+ """
+ def __init__(self, auth_kwargs, list_kwargs, create_kwargs, driver_class):
+ self.real = driver_class(**auth_kwargs)
+ self.list_kwargs = list_kwargs
+ self.create_kwargs = create_kwargs
+
+ def __getattr__(self, name):
+ # Proxy non-extension methods to the real driver.
+ if (not name.startswith('_') and not name.startswith('ex_')
+ and hasattr(self.real, name)):
+ return getattr(self.real, name)
+ else:
+ return super(BaseComputeNodeDriver, self).__getattr__(name)
+
+ def search_for(self, term, list_method, key=lambda item: item.id):
+ cache_key = (list_method, term)
+ if cache_key not in self.SEARCH_CACHE:
+ results = [item for item in getattr(self.real, list_method)()
+ if key(item) == term]
+ count = len(results)
+ if count != 1:
+ raise ValueError("{} returned {} results for '{}'".format(
+ list_method, count, term))
+ self.SEARCH_CACHE[cache_key] = results[0]
+ return self.SEARCH_CACHE[cache_key]
+
+ def list_nodes(self):
+ return self.real.list_nodes(**self.list_kwargs)
+
+ def arvados_create_kwargs(self, arvados_node):
+ raise NotImplementedError("BaseComputeNodeDriver.arvados_create_kwargs")
+
+ def create_node(self, size, arvados_node):
+ kwargs = self.create_kwargs.copy()
+ kwargs.update(self.arvados_create_kwargs(arvados_node))
+ kwargs['size'] = size
+ return self.real.create_node(**kwargs)
+
+ def sync_node(self, cloud_node, arvados_node):
+ # When a compute node first pings the API server, the API server
+ # will automatically assign some attributes on the corresponding
+ # node record, like hostname. This method should propagate that
+ # information back to the cloud node appropriately.
+ raise NotImplementedError("BaseComputeNodeDriver.sync_node")
+
+ @classmethod
+ def node_start_time(cls, node):
+ raise NotImplementedError("BaseComputeNodeDriver.node_start_time")
diff --git a/services/nodemanager/arvnodeman/computenode/dummy.py b/services/nodemanager/arvnodeman/computenode/driver/dummy.py
similarity index 96%
rename from services/nodemanager/arvnodeman/computenode/dummy.py
rename to services/nodemanager/arvnodeman/computenode/driver/dummy.py
index 6c39fea..3a286bb 100644
--- a/services/nodemanager/arvnodeman/computenode/dummy.py
+++ b/services/nodemanager/arvnodeman/computenode/driver/dummy.py
@@ -7,7 +7,8 @@ import time
import libcloud.compute.providers as cloud_provider
import libcloud.compute.types as cloud_types
-from . import BaseComputeNodeDriver, arvados_node_fqdn
+from . import BaseComputeNodeDriver
+from .. import arvados_node_fqdn
class ComputeNodeDriver(BaseComputeNodeDriver):
"""Compute node driver wrapper for libcloud's dummy driver.
diff --git a/services/nodemanager/arvnodeman/computenode/ec2.py b/services/nodemanager/arvnodeman/computenode/driver/ec2.py
similarity index 98%
rename from services/nodemanager/arvnodeman/computenode/ec2.py
rename to services/nodemanager/arvnodeman/computenode/driver/ec2.py
index 359bed4..c0992f7 100644
--- a/services/nodemanager/arvnodeman/computenode/ec2.py
+++ b/services/nodemanager/arvnodeman/computenode/driver/ec2.py
@@ -9,7 +9,8 @@ import libcloud.compute.providers as cloud_provider
import libcloud.compute.types as cloud_types
from libcloud.compute.drivers import ec2 as cloud_ec2
-from . import BaseComputeNodeDriver, arvados_node_fqdn
+from . import BaseComputeNodeDriver
+from .. import arvados_node_fqdn
### Monkeypatch libcloud to support AWS' new SecurityGroup API.
# These classes can be removed when libcloud support specifying
diff --git a/services/nodemanager/arvnodeman/config.py b/services/nodemanager/arvnodeman/config.py
index 754b931..24fd828 100644
--- a/services/nodemanager/arvnodeman/config.py
+++ b/services/nodemanager/arvnodeman/config.py
@@ -85,7 +85,7 @@ class NodeManagerConfig(ConfigParser.SafeConfigParser):
http=http)
def new_cloud_client(self):
- module = importlib.import_module('arvnodeman.computenode.' +
+ module = importlib.import_module('arvnodeman.computenode.driver.' +
self.get('Cloud', 'provider'))
auth_kwargs = self.get_section('Cloud Credentials')
if 'timeout' in auth_kwargs:
diff --git a/services/nodemanager/arvnodeman/daemon.py b/services/nodemanager/arvnodeman/daemon.py
index eaf10be..7ff736f 100644
--- a/services/nodemanager/arvnodeman/daemon.py
+++ b/services/nodemanager/arvnodeman/daemon.py
@@ -9,6 +9,7 @@ import time
import pykka
from . import computenode as cnode
+from .computenode import dispatch
from .config import actor_class
class _ComputeNodeRecord(object):
@@ -96,9 +97,9 @@ class NodeManagerDaemonActor(actor_class):
arvados_factory, cloud_factory,
shutdown_windows, min_nodes, max_nodes,
poll_stale_after=600, node_stale_after=7200,
- node_setup_class=cnode.ComputeNodeSetupActor,
- node_shutdown_class=cnode.ComputeNodeShutdownActor,
- node_actor_class=cnode.ComputeNodeMonitorActor):
+ node_setup_class=dispatch.ComputeNodeSetupActor,
+ node_shutdown_class=dispatch.ComputeNodeShutdownActor,
+ node_actor_class=dispatch.ComputeNodeMonitorActor):
super(NodeManagerDaemonActor, self).__init__()
self._node_setup = node_setup_class
self._node_shutdown = node_shutdown_class
diff --git a/services/nodemanager/arvnodeman/launcher.py b/services/nodemanager/arvnodeman/launcher.py
index f4ad716..d2f4afe 100644
--- a/services/nodemanager/arvnodeman/launcher.py
+++ b/services/nodemanager/arvnodeman/launcher.py
@@ -12,9 +12,7 @@ import daemon
import pykka
from . import config as nmconfig
-from .computenode import \
- ComputeNodeSetupActor, ComputeNodeShutdownActor, ComputeNodeUpdateActor, \
- ShutdownTimer
+from .computenode.dispatch import ComputeNodeUpdateActor
from .daemon import NodeManagerDaemonActor
from .jobqueue import JobQueueMonitorActor, ServerCalculator
from .nodelist import ArvadosNodeListMonitorActor, CloudNodeListMonitorActor
diff --git a/services/nodemanager/tests/test_computenode.py b/services/nodemanager/tests/test_computenode.py
index 5ced5f9..e22cccc 100644
--- a/services/nodemanager/tests/test_computenode.py
+++ b/services/nodemanager/tests/test_computenode.py
@@ -6,137 +6,11 @@ import time
import unittest
import arvados.errors as arverror
-import httplib2
import mock
-import pykka
import arvnodeman.computenode as cnode
from . import testutil
-class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
- def make_mocks(self, arvados_effect=None, cloud_effect=None):
- if arvados_effect is None:
- arvados_effect = [testutil.arvados_node_mock()]
- self.arvados_effect = arvados_effect
- self.timer = testutil.MockTimer()
- self.api_client = mock.MagicMock(name='api_client')
- self.api_client.nodes().create().execute.side_effect = arvados_effect
- self.api_client.nodes().update().execute.side_effect = arvados_effect
- self.cloud_client = mock.MagicMock(name='cloud_client')
- self.cloud_client.create_node.return_value = testutil.cloud_node_mock(1)
-
- def make_actor(self, arv_node=None):
- if not hasattr(self, 'timer'):
- self.make_mocks(arvados_effect=[arv_node])
- self.setup_actor = cnode.ComputeNodeSetupActor.start(
- self.timer, self.api_client, self.cloud_client,
- testutil.MockSize(1), arv_node).proxy()
-
- def test_creation_without_arvados_node(self):
- self.make_actor()
- self.assertEqual(self.arvados_effect[-1],
- self.setup_actor.arvados_node.get(self.TIMEOUT))
- self.assertTrue(self.api_client.nodes().create().execute.called)
- self.assertEqual(self.cloud_client.create_node(),
- self.setup_actor.cloud_node.get(self.TIMEOUT))
-
- def test_creation_with_arvados_node(self):
- self.make_actor(testutil.arvados_node_mock())
- self.assertEqual(self.arvados_effect[-1],
- self.setup_actor.arvados_node.get(self.TIMEOUT))
- self.assertTrue(self.api_client.nodes().update().execute.called)
- self.assertEqual(self.cloud_client.create_node(),
- self.setup_actor.cloud_node.get(self.TIMEOUT))
-
- def test_failed_calls_retried(self):
- self.make_mocks([
- arverror.ApiError(httplib2.Response({'status': '500'}), ""),
- testutil.arvados_node_mock(),
- ])
- self.make_actor()
- self.wait_for_assignment(self.setup_actor, 'cloud_node')
-
- def test_stop_when_no_cloud_node(self):
- self.make_mocks(
- arverror.ApiError(httplib2.Response({'status': '500'}), ""))
- self.make_actor()
- self.setup_actor.stop_if_no_cloud_node()
- self.assertTrue(
- self.setup_actor.actor_ref.actor_stopped.wait(self.TIMEOUT))
-
- def test_no_stop_when_cloud_node(self):
- self.make_actor()
- self.wait_for_assignment(self.setup_actor, 'cloud_node')
- self.setup_actor.stop_if_no_cloud_node().get(self.TIMEOUT)
- self.assertTrue(self.stop_proxy(self.setup_actor),
- "actor was stopped by stop_if_no_cloud_node")
-
- def test_subscribe(self):
- self.make_mocks(
- arverror.ApiError(httplib2.Response({'status': '500'}), ""))
- self.make_actor()
- subscriber = mock.Mock(name='subscriber_mock')
- self.setup_actor.subscribe(subscriber)
- self.api_client.nodes().create().execute.side_effect = [
- testutil.arvados_node_mock()]
- self.wait_for_assignment(self.setup_actor, 'cloud_node')
- self.assertEqual(self.setup_actor.actor_ref.actor_urn,
- subscriber.call_args[0][0].actor_ref.actor_urn)
-
- def test_late_subscribe(self):
- self.make_actor()
- subscriber = mock.Mock(name='subscriber_mock')
- self.wait_for_assignment(self.setup_actor, 'cloud_node')
- self.setup_actor.subscribe(subscriber).get(self.TIMEOUT)
- self.stop_proxy(self.setup_actor)
- self.assertEqual(self.setup_actor.actor_ref.actor_urn,
- subscriber.call_args[0][0].actor_ref.actor_urn)
-
-
-class ComputeNodeShutdownActorTestCase(testutil.ActorTestMixin,
- unittest.TestCase):
- def make_mocks(self, cloud_node=None):
- self.timer = testutil.MockTimer()
- self.cloud_client = mock.MagicMock(name='cloud_client')
- if cloud_node is None:
- cloud_node = testutil.cloud_node_mock()
- self.cloud_node = cloud_node
-
- def make_actor(self, arv_node=None):
- if not hasattr(self, 'timer'):
- self.make_mocks()
- self.shutdown_actor = cnode.ComputeNodeShutdownActor.start(
- self.timer, self.cloud_client, self.cloud_node).proxy()
-
- def test_easy_shutdown(self):
- self.make_actor()
- self.shutdown_actor.cloud_node.get(self.TIMEOUT)
- self.stop_proxy(self.shutdown_actor)
- self.assertTrue(self.cloud_client.destroy_node.called)
-
- def test_late_subscribe(self):
- self.make_actor()
- subscriber = mock.Mock(name='subscriber_mock')
- self.shutdown_actor.subscribe(subscriber).get(self.TIMEOUT)
- self.stop_proxy(self.shutdown_actor)
- self.assertEqual(self.shutdown_actor.actor_ref.actor_urn,
- subscriber.call_args[0][0].actor_ref.actor_urn)
-
-
-class ComputeNodeUpdateActorTestCase(testutil.ActorTestMixin,
- unittest.TestCase):
- def make_actor(self):
- self.driver = mock.MagicMock(name='driver_mock')
- self.updater = cnode.ComputeNodeUpdateActor.start(self.driver).proxy()
-
- def test_node_sync(self):
- self.make_actor()
- cloud_node = testutil.cloud_node_mock()
- arv_node = testutil.arvados_node_mock()
- self.updater.sync_node(cloud_node, arv_node).get(self.TIMEOUT)
- self.driver().sync_node.assert_called_with(cloud_node, arv_node)
-
-
@mock.patch('time.time', return_value=1)
class ShutdownTimerTestCase(unittest.TestCase):
def test_two_length_window(self, time_mock):
@@ -160,156 +34,3 @@ class ShutdownTimerTestCase(unittest.TestCase):
time_mock.return_value += 200
self.assertEqual(961, timer.next_opening())
self.assertFalse(timer.window_open())
-
-
-class ComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
- unittest.TestCase):
- class MockShutdownTimer(object):
- def _set_state(self, is_open, next_opening):
- self.window_open = lambda: is_open
- self.next_opening = lambda: next_opening
-
-
- def make_mocks(self, node_num):
- self.shutdowns = self.MockShutdownTimer()
- self.shutdowns._set_state(False, 300)
- self.timer = mock.MagicMock(name='timer_mock')
- self.updates = mock.MagicMock(name='update_mock')
- self.cloud_mock = testutil.cloud_node_mock(node_num)
- self.subscriber = mock.Mock(name='subscriber_mock')
-
- def make_actor(self, node_num=1, arv_node=None, start_time=None):
- if not hasattr(self, 'cloud_mock'):
- self.make_mocks(node_num)
- if start_time is None:
- start_time = time.time()
- self.node_actor = cnode.ComputeNodeMonitorActor.start(
- self.cloud_mock, start_time, self.shutdowns, self.timer,
- self.updates, arv_node).proxy()
- self.node_actor.subscribe(self.subscriber).get(self.TIMEOUT)
-
- def node_state(self, *states):
- return self.node_actor.in_state(*states).get(self.TIMEOUT)
-
- def test_in_state_when_unpaired(self):
- self.make_actor()
- self.assertIsNone(self.node_state('idle', 'alloc'))
-
- def test_in_state_when_pairing_stale(self):
- self.make_actor(arv_node=testutil.arvados_node_mock(
- job_uuid=None, age=90000))
- self.assertIsNone(self.node_state('idle', 'alloc'))
-
- def test_in_state_when_no_state_available(self):
- self.make_actor(arv_node=testutil.arvados_node_mock(info={}))
- self.assertIsNone(self.node_state('idle', 'alloc'))
-
- def test_in_idle_state(self):
- self.make_actor(2, arv_node=testutil.arvados_node_mock(job_uuid=None))
- self.assertTrue(self.node_state('idle'))
- self.assertFalse(self.node_state('alloc'))
- self.assertTrue(self.node_state('idle', 'alloc'))
-
- def test_in_alloc_state(self):
- self.make_actor(3, arv_node=testutil.arvados_node_mock(job_uuid=True))
- self.assertFalse(self.node_state('idle'))
- self.assertTrue(self.node_state('alloc'))
- self.assertTrue(self.node_state('idle', 'alloc'))
-
- def test_init_shutdown_scheduling(self):
- self.make_actor()
- self.assertTrue(self.timer.schedule.called)
- self.assertEqual(300, self.timer.schedule.call_args[0][0])
-
- def test_shutdown_subscription(self):
- self.make_actor()
- self.shutdowns._set_state(True, 600)
- self.node_actor.consider_shutdown().get(self.TIMEOUT)
- self.assertTrue(self.subscriber.called)
- self.assertEqual(self.node_actor.actor_ref.actor_urn,
- self.subscriber.call_args[0][0].actor_ref.actor_urn)
-
- def test_shutdown_without_arvados_node(self):
- self.make_actor()
- self.shutdowns._set_state(True, 600)
- self.node_actor.consider_shutdown().get(self.TIMEOUT)
- self.assertTrue(self.subscriber.called)
-
- def test_no_shutdown_without_arvados_node_and_old_cloud_node(self):
- self.make_actor(start_time=0)
- self.shutdowns._set_state(True, 600)
- self.node_actor.consider_shutdown().get(self.TIMEOUT)
- self.assertFalse(self.subscriber.called)
-
- def check_shutdown_rescheduled(self, window_open, next_window,
- schedule_time=None):
- self.shutdowns._set_state(window_open, next_window)
- self.timer.schedule.reset_mock()
- self.node_actor.consider_shutdown().get(self.TIMEOUT)
- self.stop_proxy(self.node_actor)
- self.assertTrue(self.timer.schedule.called)
- if schedule_time is not None:
- self.assertEqual(schedule_time, self.timer.schedule.call_args[0][0])
- self.assertFalse(self.subscriber.called)
-
- def test_shutdown_window_close_scheduling(self):
- self.make_actor()
- self.check_shutdown_rescheduled(False, 600, 600)
-
- def test_no_shutdown_when_node_running_job(self):
- self.make_actor(4, testutil.arvados_node_mock(4, job_uuid=True))
- self.check_shutdown_rescheduled(True, 600)
-
- def test_no_shutdown_when_node_state_unknown(self):
- self.make_actor(5, testutil.arvados_node_mock(5, info={}))
- self.check_shutdown_rescheduled(True, 600)
-
- def test_no_shutdown_when_node_state_stale(self):
- self.make_actor(6, testutil.arvados_node_mock(6, age=90000))
- self.check_shutdown_rescheduled(True, 600)
-
- def test_arvados_node_match(self):
- self.make_actor(2)
- arv_node = testutil.arvados_node_mock(
- 2, hostname='compute-two.zzzzz.arvadosapi.com')
- pair_id = self.node_actor.offer_arvados_pair(arv_node).get(self.TIMEOUT)
- self.assertEqual(self.cloud_mock.id, pair_id)
- self.stop_proxy(self.node_actor)
- self.updates.sync_node.assert_called_with(self.cloud_mock, arv_node)
-
- def test_arvados_node_mismatch(self):
- self.make_actor(3)
- arv_node = testutil.arvados_node_mock(1)
- self.assertIsNone(
- self.node_actor.offer_arvados_pair(arv_node).get(self.TIMEOUT))
-
- def test_update_cloud_node(self):
- self.make_actor(1)
- self.make_mocks(2)
- self.cloud_mock.id = '1'
- self.node_actor.update_cloud_node(self.cloud_mock)
- current_cloud = self.node_actor.cloud_node.get(self.TIMEOUT)
- self.assertEqual([testutil.ip_address_mock(2)],
- current_cloud.private_ips)
-
- def test_missing_cloud_node_update(self):
- self.make_actor(1)
- self.node_actor.update_cloud_node(None)
- current_cloud = self.node_actor.cloud_node.get(self.TIMEOUT)
- self.assertEqual([testutil.ip_address_mock(1)],
- current_cloud.private_ips)
-
- def test_update_arvados_node(self):
- self.make_actor(3)
- job_uuid = 'zzzzz-jjjjj-updatejobnode00'
- new_arvados = testutil.arvados_node_mock(3, job_uuid)
- self.node_actor.update_arvados_node(new_arvados)
- current_arvados = self.node_actor.arvados_node.get(self.TIMEOUT)
- self.assertEqual(job_uuid, current_arvados['job_uuid'])
-
- def test_missing_arvados_node_update(self):
- self.make_actor(4, testutil.arvados_node_mock(4))
- self.node_actor.update_arvados_node(None)
- current_arvados = self.node_actor.arvados_node.get(self.TIMEOUT)
- self.assertEqual(testutil.ip_address_mock(4),
- current_arvados['ip_address'])
diff --git a/services/nodemanager/tests/test_computenode.py b/services/nodemanager/tests/test_computenode_dispatch.py
similarity index 89%
copy from services/nodemanager/tests/test_computenode.py
copy to services/nodemanager/tests/test_computenode_dispatch.py
index 5ced5f9..ece186b 100644
--- a/services/nodemanager/tests/test_computenode.py
+++ b/services/nodemanager/tests/test_computenode_dispatch.py
@@ -10,7 +10,7 @@ import httplib2
import mock
import pykka
-import arvnodeman.computenode as cnode
+import arvnodeman.computenode.dispatch as dispatch
from . import testutil
class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
@@ -28,7 +28,7 @@ class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
def make_actor(self, arv_node=None):
if not hasattr(self, 'timer'):
self.make_mocks(arvados_effect=[arv_node])
- self.setup_actor = cnode.ComputeNodeSetupActor.start(
+ self.setup_actor = dispatch.ComputeNodeSetupActor.start(
self.timer, self.api_client, self.cloud_client,
testutil.MockSize(1), arv_node).proxy()
@@ -105,7 +105,7 @@ class ComputeNodeShutdownActorTestCase(testutil.ActorTestMixin,
def make_actor(self, arv_node=None):
if not hasattr(self, 'timer'):
self.make_mocks()
- self.shutdown_actor = cnode.ComputeNodeShutdownActor.start(
+ self.shutdown_actor = dispatch.ComputeNodeShutdownActor.start(
self.timer, self.cloud_client, self.cloud_node).proxy()
def test_easy_shutdown(self):
@@ -127,7 +127,7 @@ class ComputeNodeUpdateActorTestCase(testutil.ActorTestMixin,
unittest.TestCase):
def make_actor(self):
self.driver = mock.MagicMock(name='driver_mock')
- self.updater = cnode.ComputeNodeUpdateActor.start(self.driver).proxy()
+ self.updater = dispatch.ComputeNodeUpdateActor.start(self.driver).proxy()
def test_node_sync(self):
self.make_actor()
@@ -137,31 +137,6 @@ class ComputeNodeUpdateActorTestCase(testutil.ActorTestMixin,
self.driver().sync_node.assert_called_with(cloud_node, arv_node)
- at mock.patch('time.time', return_value=1)
-class ShutdownTimerTestCase(unittest.TestCase):
- def test_two_length_window(self, time_mock):
- timer = cnode.ShutdownTimer(time_mock.return_value, [8, 2])
- self.assertEqual(481, timer.next_opening())
- self.assertFalse(timer.window_open())
- time_mock.return_value += 500
- self.assertEqual(1081, timer.next_opening())
- self.assertTrue(timer.window_open())
- time_mock.return_value += 200
- self.assertEqual(1081, timer.next_opening())
- self.assertFalse(timer.window_open())
-
- def test_three_length_window(self, time_mock):
- timer = cnode.ShutdownTimer(time_mock.return_value, [6, 3, 1])
- self.assertEqual(361, timer.next_opening())
- self.assertFalse(timer.window_open())
- time_mock.return_value += 400
- self.assertEqual(961, timer.next_opening())
- self.assertTrue(timer.window_open())
- time_mock.return_value += 200
- self.assertEqual(961, timer.next_opening())
- self.assertFalse(timer.window_open())
-
-
class ComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
unittest.TestCase):
class MockShutdownTimer(object):
@@ -183,7 +158,7 @@ class ComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
self.make_mocks(node_num)
if start_time is None:
start_time = time.time()
- self.node_actor = cnode.ComputeNodeMonitorActor.start(
+ self.node_actor = dispatch.ComputeNodeMonitorActor.start(
self.cloud_mock, start_time, self.shutdowns, self.timer,
self.updates, arv_node).proxy()
self.node_actor.subscribe(self.subscriber).get(self.TIMEOUT)
diff --git a/services/nodemanager/tests/test_computenode_ec2.py b/services/nodemanager/tests/test_computenode_driver_ec2.py
similarity index 98%
rename from services/nodemanager/tests/test_computenode_ec2.py
rename to services/nodemanager/tests/test_computenode_driver_ec2.py
index d1c9e43..fde103e 100644
--- a/services/nodemanager/tests/test_computenode_ec2.py
+++ b/services/nodemanager/tests/test_computenode_driver_ec2.py
@@ -7,7 +7,7 @@ import unittest
import mock
-import arvnodeman.computenode.ec2 as ec2
+import arvnodeman.computenode.driver.ec2 as ec2
from . import testutil
class EC2ComputeNodeDriverTestCase(unittest.TestCase):
diff --git a/services/nodemanager/tests/test_daemon.py b/services/nodemanager/tests/test_daemon.py
index 4bffd09..394fb88 100644
--- a/services/nodemanager/tests/test_daemon.py
+++ b/services/nodemanager/tests/test_daemon.py
@@ -8,8 +8,8 @@ import unittest
import mock
import pykka
-import arvnodeman.computenode as nmcnode
import arvnodeman.daemon as nmdaemon
+from arvnodeman.computenode.dispatch import ComputeNodeMonitorActor
from . import testutil
class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
@@ -39,7 +39,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
self.daemon.update_server_wishlist(want_sizes).get(self.TIMEOUT)
def monitor_list(self):
- return pykka.ActorRegistry.get_by_class(nmcnode.ComputeNodeMonitorActor)
+ return pykka.ActorRegistry.get_by_class(ComputeNodeMonitorActor)
def alive_monitor_count(self):
return sum(1 for actor in self.monitor_list() if actor.is_alive())
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list