[ARVADOS] created: 31adfc807ddaea42f30b1ae425e4be266f81e1b5
git at public.curoverse.com
git at public.curoverse.com
Thu Nov 13 16:25:18 EST 2014
at 31adfc807ddaea42f30b1ae425e4be266f81e1b5 (commit)
commit 31adfc807ddaea42f30b1ae425e4be266f81e1b5
Author: Brett Smith <brett at curoverse.com>
Date: Thu Nov 13 15:34:54 2014 -0500
4380: Add SLURM dispatcher to Node Manager.
diff --git a/services/nodemanager/arvnodeman/computenode/dispatch/slurm.py b/services/nodemanager/arvnodeman/computenode/dispatch/slurm.py
new file mode 100644
index 0000000..27397e5
--- /dev/null
+++ b/services/nodemanager/arvnodeman/computenode/dispatch/slurm.py
@@ -0,0 +1,49 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+import subprocess
+import time
+
+from . import \
+ ComputeNodeSetupActor, ComputeNodeUpdateActor, ComputeNodeMonitorActor
+from . import ComputeNodeShutdownActor as ShutdownActorBase
+
+class ComputeNodeShutdownActor(ShutdownActorBase):
+ def on_start(self):
+ arv_node = self._monitor.arvados_node.get()
+ if arv_node is None:
+ return super(ComputeNodeShutdownActor, self).on_start()
+ else:
+ self._nodename = arv_node['hostname']
+ self._logger.info("Draining SLURM node %s", self._nodename)
+ self._later.issue_slurm_drain()
+
+ def _set_node_state(self, state, *args):
+ cmd = ['scontrol', 'update', 'NodeName=' + self._nodename,
+ 'State=' + state]
+ cmd.extend(args)
+ subprocess.check_output(cmd)
+
+ @ShutdownActorBase._retry((subprocess.CalledProcessError,))
+ def cancel_shutdown(self):
+ self._set_node_state('RESUME')
+ return super(ComputeNodeShutdownActor, self).cancel_shutdown()
+
+ @ShutdownActorBase._stop_if_window_closed
+ @ShutdownActorBase._retry((subprocess.CalledProcessError,))
+ def issue_slurm_drain(self):
+ self._set_node_state('DRAIN', 'Reason=Node Manager shutdown')
+ self._logger.info("Waiting for SLURM node %s to drain", self._nodename)
+ self._later.await_slurm_drain()
+
+ @ShutdownActorBase._stop_if_window_closed
+ @ShutdownActorBase._retry((subprocess.CalledProcessError,))
+ def await_slurm_drain(self):
+ output = subprocess.check_output(
+ ['sinfo', '--noheader', '-o', '%t', '-n', self._nodename])
+ if output == 'drain\n':
+ self._later.shutdown_node()
+ else:
+ self._timer.schedule(time.time() + 10,
+ self._later.await_slurm_drain)
diff --git a/services/nodemanager/arvnodeman/config.py b/services/nodemanager/arvnodeman/config.py
index 24fd828..079e623 100644
--- a/services/nodemanager/arvnodeman/config.py
+++ b/services/nodemanager/arvnodeman/config.py
@@ -68,6 +68,17 @@ class NodeManagerConfig(ConfigParser.SafeConfigParser):
for key in self.options('Logging')
if key not in self.LOGGING_NONLEVELS}
+ def dispatch_classes(self):
+ mod_name = 'arvnodeman.computenode.dispatch'
+ if self.has_option('Daemon', 'dispatcher'):
+ mod_name = '{}.{}'.format(mod_name,
+ self.get('Daemon', 'dispatcher'))
+ module = importlib.import_module(mod_name)
+ return (module.ComputeNodeSetupActor,
+ module.ComputeNodeShutdownActor,
+ module.ComputeNodeUpdateActor,
+ module.ComputeNodeMonitorActor)
+
def new_arvados_client(self):
if self.has_option('Daemon', 'certs_file'):
certs_file = self.get('Daemon', 'certs_file')
diff --git a/services/nodemanager/arvnodeman/launcher.py b/services/nodemanager/arvnodeman/launcher.py
index d2f4afe..9f5e162 100644
--- a/services/nodemanager/arvnodeman/launcher.py
+++ b/services/nodemanager/arvnodeman/launcher.py
@@ -12,7 +12,6 @@ import daemon
import pykka
from . import config as nmconfig
-from .computenode.dispatch import ComputeNodeUpdateActor
from .daemon import NodeManagerDaemonActor
from .jobqueue import JobQueueMonitorActor, ServerCalculator
from .nodelist import ArvadosNodeListMonitorActor, CloudNodeListMonitorActor
@@ -107,10 +106,11 @@ def main(args=None):
signal.signal(sigcode, shutdown_signal)
setup_logging(config.get('Logging', 'file'), **config.log_levels())
+ node_setup, node_shutdown, node_update, node_monitor = \
+ config.dispatch_classes()
timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
launch_pollers(config)
- cloud_node_updater = ComputeNodeUpdateActor.start(
- config.new_cloud_client).proxy()
+ cloud_node_updater = node_update.start(config.new_cloud_client).proxy()
node_daemon = NodeManagerDaemonActor.start(
job_queue_poller, arvados_node_poller, cloud_node_poller,
cloud_node_updater, timer,
@@ -119,7 +119,8 @@ def main(args=None):
config.getint('Daemon', 'min_nodes'),
config.getint('Daemon', 'max_nodes'),
config.getint('Daemon', 'poll_stale_after'),
- config.getint('Daemon', 'node_stale_after')).proxy()
+ config.getint('Daemon', 'node_stale_after'),
+ node_setup, node_shutdown, node_monitor).proxy()
signal.pause()
daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
diff --git a/services/nodemanager/doc/ec2.example.cfg b/services/nodemanager/doc/ec2.example.cfg
index f4b27af..0f9caca 100644
--- a/services/nodemanager/doc/ec2.example.cfg
+++ b/services/nodemanager/doc/ec2.example.cfg
@@ -2,6 +2,11 @@
# All times are in seconds unless specified otherwise.
[Daemon]
+# The dispatcher can customize the start and stop procedure for
+# cloud nodes. For example, the SLURM dispatcher drains nodes
+# through SLURM before shutting them down.
+#dispatcher = slurm
+
# Node Manager will ensure that there are at least this many nodes
# running at all times.
min_nodes = 0
diff --git a/services/nodemanager/tests/test_computenode_dispatch.py b/services/nodemanager/tests/test_computenode_dispatch.py
index b8239f3..7f6988d 100644
--- a/services/nodemanager/tests/test_computenode_dispatch.py
+++ b/services/nodemanager/tests/test_computenode_dispatch.py
@@ -93,8 +93,7 @@ class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
subscriber.call_args[0][0].actor_ref.actor_urn)
-class ComputeNodeShutdownActorTestCase(testutil.ActorTestMixin,
- unittest.TestCase):
+class ComputeNodeShutdownActorMixin(testutil.ActorTestMixin):
def make_mocks(self, cloud_node=None, arvados_node=None,
shutdown_open=True):
self.timer = testutil.MockTimer()
@@ -113,7 +112,7 @@ class ComputeNodeShutdownActorTestCase(testutil.ActorTestMixin,
monitor_actor = dispatch.ComputeNodeMonitorActor.start(
self.cloud_node, time.time(), self.shutdowns, self.timer,
self.updates, self.arvados_node)
- self.shutdown_actor = dispatch.ComputeNodeShutdownActor.start(
+ self.shutdown_actor = self.ACTOR_CLASS.start(
self.timer, self.cloud_client, monitor_actor).proxy()
self.monitor_actor = monitor_actor.proxy()
@@ -127,6 +126,11 @@ class ComputeNodeShutdownActorTestCase(testutil.ActorTestMixin,
else:
self.fail("success flag {} is not {}".format(last_flag, expected))
+
+class ComputeNodeShutdownActorTestCase(ComputeNodeShutdownActorMixin,
+ unittest.TestCase):
+ ACTOR_CLASS = dispatch.ComputeNodeShutdownActor
+
def test_easy_shutdown(self):
self.make_actor()
self.check_success_flag(True)
diff --git a/services/nodemanager/tests/test_computenode_dispatch_slurm.py b/services/nodemanager/tests/test_computenode_dispatch_slurm.py
new file mode 100644
index 0000000..ccac8b2
--- /dev/null
+++ b/services/nodemanager/tests/test_computenode_dispatch_slurm.py
@@ -0,0 +1,58 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+import subprocess
+import unittest
+
+import mock
+
+import arvnodeman.computenode.dispatch.slurm as slurm_dispatch
+from . import testutil
+from .test_computenode_dispatch import ComputeNodeShutdownActorMixin
+
+ at mock.patch('subprocess.check_output')
+class SLURMComputeNodeShutdownActorTestCase(ComputeNodeShutdownActorMixin,
+ unittest.TestCase):
+ ACTOR_CLASS = slurm_dispatch.ComputeNodeShutdownActor
+
+ def check_slurm_got_args(self, proc_mock, *args):
+ self.assertTrue(proc_mock.called)
+ slurm_cmd = proc_mock.call_args[0][0]
+ for s in args:
+ self.assertIn(s, slurm_cmd)
+
+ def check_success_after_reset(self, proc_mock):
+ self.make_mocks(arvados_node=testutil.arvados_node_mock(63))
+ self.make_actor()
+ self.check_success_flag(None, 0)
+ self.check_success_flag(None, 0)
+ # Order is critical here: if the mock gets called when no return value
+ # or side effect is set, we may invoke a real subprocess.
+ proc_mock.return_value = 'drain\n'
+ proc_mock.side_effect = None
+ self.check_success_flag(True, 3)
+ self.check_slurm_got_args(proc_mock, 'compute63')
+
+ def test_wait_for_drained_state(self, proc_mock):
+ proc_mock.return_value = 'drng\n'
+ self.check_success_after_reset(proc_mock)
+
+ def test_retry_failed_slurm_calls(self, proc_mock):
+ proc_mock.side_effect = subprocess.CalledProcessError(1, ["mock"])
+ self.check_success_after_reset(proc_mock)
+
+ def test_slurm_bypassed_when_no_arvados_node(self, proc_mock):
+ # Test we correctly handle a node that failed to bootstrap.
+ proc_mock.return_value = 'idle\n'
+ self.make_actor()
+ self.check_success_flag(True)
+ self.assertFalse(proc_mock.called)
+
+ def test_node_undrained_when_shutdown_window_closes(self, proc_mock):
+ proc_mock.return_value = 'alloc\n'
+ self.make_mocks(arvados_node=testutil.arvados_node_mock(job_uuid=True))
+ self.make_actor()
+ self.check_success_flag(False, 2)
+ self.check_slurm_got_args(proc_mock, 'NodeName=compute99',
+ 'State=RESUME')
diff --git a/services/nodemanager/tests/test_config.py b/services/nodemanager/tests/test_config.py
index 3aa9541..d43491e 100644
--- a/services/nodemanager/tests/test_config.py
+++ b/services/nodemanager/tests/test_config.py
@@ -6,6 +6,8 @@ import io
import logging
import unittest
+import arvnodeman.computenode.dispatch as dispatch
+import arvnodeman.computenode.dispatch.slurm as slurm_dispatch
import arvnodeman.config as nmconfig
class NodeManagerConfigTestCase(unittest.TestCase):
@@ -63,3 +65,19 @@ testlogger = INFO
self.assertEqual({'level': logging.DEBUG,
'testlogger': logging.INFO},
config.log_levels())
+
+ def check_dispatch_classes(self, config, module):
+ setup, shutdown, update, monitor = config.dispatch_classes()
+ self.assertIs(setup, module.ComputeNodeSetupActor)
+ self.assertIs(shutdown, module.ComputeNodeShutdownActor)
+ self.assertIs(update, module.ComputeNodeUpdateActor)
+ self.assertIs(monitor, module.ComputeNodeMonitorActor)
+
+ def test_default_dispatch(self):
+ config = self.load_config()
+ self.check_dispatch_classes(config, dispatch)
+
+ def test_custom_dispatch(self):
+ config = self.load_config(
+ config_str=self.TEST_CONFIG + "[Daemon]\ndispatcher=slurm\n")
+ self.check_dispatch_classes(config, slurm_dispatch)
commit ce4d36c44a7418d994db2061bd53bbb64ce7949c
Author: Brett Smith <brett at curoverse.com>
Date: Thu Nov 13 15:44:19 2014 -0500
4380: Fix retry scheduling for Node Manager change methods.
diff --git a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
index 8d0e6c3..ae0a65b 100644
--- a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
+++ b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
@@ -41,13 +41,14 @@ class ComputeNodeStateChangeBase(config.actor_class):
def decorator(orig_func):
@functools.wraps(orig_func)
def wrapper(self, *args, **kwargs):
+ start_time = time.time()
try:
orig_func(self, *args, **kwargs)
except errors as error:
self._logger.warning(
"Client error: %s - waiting %s seconds",
error, self.retry_wait)
- self._timer.schedule(self.retry_wait,
+ self._timer.schedule(start_time + self.retry_wait,
getattr(self._later,
orig_func.__name__),
*args, **kwargs)
commit 7e9c860fd07443c1e7aac9250fd8fc6bde2ea638
Author: Brett Smith <brett at curoverse.com>
Date: Thu Nov 13 12:58:14 2014 -0500
4380: Node Manager shutdown actor is more robust.
ComputeNodeShutdownActor now checks that destroying the cloud node
succeeds. Before retrying, it will check if the node is still
eligible for shutdown, and abort if not.
diff --git a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
index 4dc3dcb..8d0e6c3 100644
--- a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
+++ b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
@@ -6,6 +6,7 @@ import functools
import logging
import time
+import libcloud.common.types as cloud_types
import pykka
from .. import arvados_node_fqdn, arvados_node_mtime, timestamp_fresh
@@ -133,19 +134,48 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
This actor simply destroys a cloud node, retrying as needed.
"""
- def __init__(self, timer_actor, cloud_client, cloud_node,
+ def __init__(self, timer_actor, cloud_client, node_monitor,
retry_wait=1, max_retry_wait=180):
super(ComputeNodeShutdownActor, self).__init__(
'arvnodeman.nodedown', timer_actor, retry_wait, max_retry_wait)
self._cloud = cloud_client
- self.cloud_node = cloud_node
+ self._monitor = node_monitor.proxy()
+ self.cloud_node = self._monitor.cloud_node.get()
+ self.success = None
+
+ def on_start(self):
self._later.shutdown_node()
+ def cancel_shutdown(self):
+ self.success = False
+ self._finished()
+
+ def _stop_if_window_closed(orig_func):
+ @functools.wraps(orig_func)
+ def wrapper(self, *args, **kwargs):
+ if not self._monitor.shutdown_eligible().get():
+ self._logger.info(
+ "Cloud node %s shutdown cancelled - no longer eligible.",
+ self.cloud_node.id)
+ self._later.cancel_shutdown()
+ return None
+ else:
+ return orig_func(self, *args, **kwargs)
+ return wrapper
+
+ @_stop_if_window_closed
@ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
def shutdown_node(self):
- self._cloud.destroy_node(self.cloud_node)
- self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
- self._finished()
+ if self._cloud.destroy_node(self.cloud_node):
+ self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
+ self.success = True
+ self._finished()
+ else:
+ # Force a retry.
+ raise cloud_types.LibcloudError("destroy_node failed")
+
+ # Make the decorator available to subclasses.
+ _stop_if_window_closed = staticmethod(_stop_if_window_closed)
class ComputeNodeUpdateActor(config.actor_class):
diff --git a/services/nodemanager/arvnodeman/daemon.py b/services/nodemanager/arvnodeman/daemon.py
index 7ff736f..9f22568 100644
--- a/services/nodemanager/arvnodeman/daemon.py
+++ b/services/nodemanager/arvnodeman/daemon.py
@@ -190,9 +190,8 @@ class NodeManagerDaemonActor(actor_class):
break
def _nodes_up(self):
- up = sum(len(nodelist) for nodelist in
- [self.cloud_nodes, self.booted, self.booting])
- return up - len(self.shutdowns)
+ return sum(len(nodelist) for nodelist in
+ [self.cloud_nodes, self.booted, self.booting])
def _nodes_busy(self):
return sum(1 for idle in
@@ -201,12 +200,21 @@ class NodeManagerDaemonActor(actor_class):
if idle is False)
def _nodes_wanted(self):
- return min(len(self.last_wishlist) + self._nodes_busy(),
- self.max_nodes) - self._nodes_up()
+ up_count = self._nodes_up()
+ over_max = up_count - self.max_nodes
+ if over_max >= 0:
+ return -over_max
+ else:
+ up_count -= len(self.shutdowns) + self._nodes_busy()
+ return len(self.last_wishlist) - up_count
def _nodes_excess(self):
- needed_nodes = self._nodes_busy() + len(self.last_wishlist)
- return (self._nodes_up() - max(self.min_nodes, needed_nodes))
+ up_count = self._nodes_up() - len(self.shutdowns)
+ over_min = up_count - self.min_nodes
+ if over_min <= 0:
+ return over_min
+ else:
+ return up_count - self._nodes_busy() - len(self.last_wishlist)
def update_server_wishlist(self, wishlist):
self._update_poll_time('server_wishlist')
@@ -257,11 +265,12 @@ class NodeManagerDaemonActor(actor_class):
if nodes_wanted > 1:
self._later.start_node()
- def _actor_nodes(self, node_actor):
- return pykka.get_all([node_actor.cloud_node, node_actor.arvados_node])
+ def _get_actor_attrs(self, actor, *attr_names):
+ return pykka.get_all([getattr(actor, name) for name in attr_names])
def node_up(self, setup_proxy):
- cloud_node, arvados_node = self._actor_nodes(setup_proxy)
+ cloud_node, arvados_node = self._get_actor_attrs(
+ setup_proxy, 'cloud_node', 'arvados_node')
del self.booting[setup_proxy.actor_ref.actor_urn]
setup_proxy.stop()
record = self.cloud_nodes.get(cloud_node.id)
@@ -287,19 +296,23 @@ class NodeManagerDaemonActor(actor_class):
def node_can_shutdown(self, node_actor):
if self._nodes_excess() < 1:
return None
- cloud_node, arvados_node = self._actor_nodes(node_actor)
- if cloud_node.id in self.shutdowns:
+ cloud_node_id = node_actor.cloud_node.get().id
+ if cloud_node_id in self.shutdowns:
return None
- shutdown = self._node_shutdown.start(timer_actor=self._timer,
- cloud_client=self._new_cloud(),
- cloud_node=cloud_node).proxy()
- self.shutdowns[cloud_node.id] = shutdown
+ shutdown = self._node_shutdown.start(
+ timer_actor=self._timer, cloud_client=self._new_cloud(),
+ node_monitor=node_actor.actor_ref).proxy()
+ self.shutdowns[cloud_node_id] = shutdown
shutdown.subscribe(self._later.node_finished_shutdown)
def node_finished_shutdown(self, shutdown_actor):
- cloud_node_id = shutdown_actor.cloud_node.get().id
+ success, cloud_node = self._get_actor_attrs(shutdown_actor, 'success',
+ 'cloud_node')
shutdown_actor.stop()
- if cloud_node_id in self.booted:
+ cloud_node_id = cloud_node.id
+ if not success:
+ del self.shutdowns[cloud_node_id]
+ elif cloud_node_id in self.booted:
self.booted.pop(cloud_node_id).actor.stop()
del self.shutdowns[cloud_node_id]
diff --git a/services/nodemanager/tests/test_computenode_dispatch.py b/services/nodemanager/tests/test_computenode_dispatch.py
index 3a1c8ba..b8239f3 100644
--- a/services/nodemanager/tests/test_computenode_dispatch.py
+++ b/services/nodemanager/tests/test_computenode_dispatch.py
@@ -95,30 +95,63 @@ class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
class ComputeNodeShutdownActorTestCase(testutil.ActorTestMixin,
unittest.TestCase):
- def make_mocks(self, cloud_node=None):
+ def make_mocks(self, cloud_node=None, arvados_node=None,
+ shutdown_open=True):
self.timer = testutil.MockTimer()
+ self.shutdowns = testutil.MockShutdownTimer()
+ self.shutdowns._set_state(shutdown_open, 300)
self.cloud_client = mock.MagicMock(name='cloud_client')
+ self.updates = mock.MagicMock(name='update_mock')
if cloud_node is None:
cloud_node = testutil.cloud_node_mock()
self.cloud_node = cloud_node
+ self.arvados_node = arvados_node
- def make_actor(self, arv_node=None):
+ def make_actor(self):
if not hasattr(self, 'timer'):
self.make_mocks()
+ monitor_actor = dispatch.ComputeNodeMonitorActor.start(
+ self.cloud_node, time.time(), self.shutdowns, self.timer,
+ self.updates, self.arvados_node)
self.shutdown_actor = dispatch.ComputeNodeShutdownActor.start(
- self.timer, self.cloud_client, self.cloud_node).proxy()
+ self.timer, self.cloud_client, monitor_actor).proxy()
+ self.monitor_actor = monitor_actor.proxy()
+
+ def check_success_flag(self, expected, allow_msg_count=1):
+ # allow_msg_count is the number of internal messages that may
+ # need to be handled for shutdown to finish.
+ for try_num in range(1 + allow_msg_count):
+ last_flag = self.shutdown_actor.success.get(self.TIMEOUT)
+ if last_flag is expected:
+ break
+ else:
+ self.fail("success flag {} is not {}".format(last_flag, expected))
def test_easy_shutdown(self):
self.make_actor()
- self.shutdown_actor.cloud_node.get(self.TIMEOUT)
- self.stop_proxy(self.shutdown_actor)
+ self.check_success_flag(True)
self.assertTrue(self.cloud_client.destroy_node.called)
+ def test_shutdown_cancelled_when_window_closes(self):
+ self.make_mocks(shutdown_open=False)
+ self.make_actor()
+ self.check_success_flag(False, 2)
+ self.assertFalse(self.cloud_client.destroy_node.called)
+
+ def test_shutdown_retries_when_cloud_fails(self):
+ self.make_mocks()
+ self.cloud_client.destroy_node.return_value = False
+ self.make_actor()
+ self.assertIsNone(self.shutdown_actor.success.get(self.TIMEOUT))
+ self.cloud_client.destroy_node.return_value = True
+ self.check_success_flag(True)
+
def test_late_subscribe(self):
self.make_actor()
subscriber = mock.Mock(name='subscriber_mock')
self.shutdown_actor.subscribe(subscriber).get(self.TIMEOUT)
self.stop_proxy(self.shutdown_actor)
+ self.assertTrue(subscriber.called)
self.assertEqual(self.shutdown_actor.actor_ref.actor_urn,
subscriber.call_args[0][0].actor_ref.actor_urn)
@@ -139,14 +172,8 @@ class ComputeNodeUpdateActorTestCase(testutil.ActorTestMixin,
class ComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
unittest.TestCase):
- class MockShutdownTimer(object):
- def _set_state(self, is_open, next_opening):
- self.window_open = lambda: is_open
- self.next_opening = lambda: next_opening
-
-
def make_mocks(self, node_num):
- self.shutdowns = self.MockShutdownTimer()
+ self.shutdowns = testutil.MockShutdownTimer()
self.shutdowns._set_state(False, 300)
self.timer = mock.MagicMock(name='timer_mock')
self.updates = mock.MagicMock(name='update_mock')
diff --git a/services/nodemanager/tests/test_daemon.py b/services/nodemanager/tests/test_daemon.py
index 394fb88..31a682f 100644
--- a/services/nodemanager/tests/test_daemon.py
+++ b/services/nodemanager/tests/test_daemon.py
@@ -260,6 +260,57 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
self.stop_proxy(self.daemon)
self.assertFalse(self.node_shutdown.start.called)
+ def test_node_shutdown_after_cancelled_shutdown(self):
+ cloud_node = testutil.cloud_node_mock(5)
+ self.make_daemon([cloud_node], [testutil.arvados_node_mock(5)])
+ self.assertEqual(1, self.alive_monitor_count())
+ monitor = self.monitor_list()[0].proxy()
+ shutdown_proxy = self.node_shutdown.start().proxy
+ shutdown_proxy().cloud_node.get.return_value = cloud_node
+ shutdown_proxy().success.get.return_value = False
+ shutdown_proxy.reset_mock()
+ self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
+ self.assertTrue(shutdown_proxy.called)
+ self.daemon.node_finished_shutdown(shutdown_proxy()).get(self.TIMEOUT)
+ shutdown_proxy().success.get.return_value = True
+ shutdown_proxy.reset_mock()
+ self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
+ self.assertTrue(shutdown_proxy.called)
+
+ def test_nodes_shutting_down_replaced_below_max_nodes(self):
+ cloud_node = testutil.cloud_node_mock(6)
+ self.make_daemon([cloud_node], [testutil.arvados_node_mock(6)])
+ self.assertEqual(1, self.alive_monitor_count())
+ monitor = self.monitor_list()[0].proxy()
+ self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
+ self.assertTrue(self.node_shutdown.start.called)
+ self.daemon.update_server_wishlist(
+ [testutil.MockSize(6)]).get(self.TIMEOUT)
+ self.stop_proxy(self.daemon)
+ self.assertTrue(self.node_setup.start.called)
+
+ def test_nodes_shutting_down_not_replaced_at_max_nodes(self):
+ cloud_node = testutil.cloud_node_mock(7)
+ self.make_daemon([cloud_node], [testutil.arvados_node_mock(7)],
+ max_nodes=1)
+ self.assertEqual(1, self.alive_monitor_count())
+ monitor = self.monitor_list()[0].proxy()
+ self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
+ self.assertTrue(self.node_shutdown.start.called)
+ self.daemon.update_server_wishlist(
+ [testutil.MockSize(7)]).get(self.TIMEOUT)
+ self.stop_proxy(self.daemon)
+ self.assertFalse(self.node_setup.start.called)
+
+ def test_nodes_shutting_down_count_against_excess(self):
+ cloud_nodes = [testutil.cloud_node_mock(n) for n in [8, 9]]
+ arv_nodes = [testutil.arvados_node_mock(n) for n in [8, 9]]
+ self.make_daemon(cloud_nodes, arv_nodes, [testutil.MockSize(8)])
+ self.assertEqual(2, self.alive_monitor_count())
+ for mon_ref in self.monitor_list():
+ self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
+ self.assertEqual(1, self.node_shutdown.start.call_count)
+
def test_clean_shutdown_waits_for_node_setup_finish(self):
new_node = self.start_node_boot()
self.daemon.shutdown().get(self.TIMEOUT)
diff --git a/services/nodemanager/tests/testutil.py b/services/nodemanager/tests/testutil.py
index a1b0658..56f22c8 100644
--- a/services/nodemanager/tests/testutil.py
+++ b/services/nodemanager/tests/testutil.py
@@ -41,6 +41,12 @@ def cloud_node_mock(node_num=99):
def ip_address_mock(last_octet):
return '10.20.30.{}'.format(last_octet)
+class MockShutdownTimer(object):
+ def _set_state(self, is_open, next_opening):
+ self.window_open = lambda: is_open
+ self.next_opening = lambda: next_opening
+
+
class MockSize(object):
def __init__(self, factor):
self.id = 'z{}.test'.format(factor)
commit 4a84c449b8b523042bc5bda6f2b4b93730fa0617
Author: Brett Smith <brett at curoverse.com>
Date: Thu Nov 13 09:47:07 2014 -0500
4380: Node Manager monitors respond to shutdown_eligible message.
diff --git a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
index d613ef1..4dc3dcb 100644
--- a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
+++ b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
@@ -246,8 +246,10 @@ class ComputeNodeMonitorActor(config.actor_class):
result = result and not self.arvados_node['job_uuid']
return result
- def _shutdown_eligible(self):
- if self.arvados_node is None:
+ def shutdown_eligible(self):
+ if not self._shutdowns.window_open():
+ return False
+ elif self.arvados_node is None:
# If this is a new, unpaired node, it's eligible for
# shutdown--we figure there was an error during bootstrap.
return timestamp_fresh(self.cloud_node_start_time,
@@ -257,17 +259,15 @@ class ComputeNodeMonitorActor(config.actor_class):
def consider_shutdown(self):
next_opening = self._shutdowns.next_opening()
- if self._shutdowns.window_open():
- if self._shutdown_eligible():
- self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
- _notify_subscribers(self._later, self.subscribers)
- else:
- self._debug("Node %s shutdown window open but node busy.",
- self.cloud_node.id)
- else:
+ if self.shutdown_eligible():
+ self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
+ _notify_subscribers(self._later, self.subscribers)
+ elif self._shutdowns.window_open():
+ self._debug("Node %s shutdown window open but node busy.",
+ self.cloud_node.id)
+ elif self.last_shutdown_opening != next_opening:
self._debug("Node %s shutdown window closed. Next at %s.",
self.cloud_node.id, time.ctime(next_opening))
- if self.last_shutdown_opening != next_opening:
self._timer.schedule(next_opening, self._later.consider_shutdown)
self.last_shutdown_opening = next_opening
diff --git a/services/nodemanager/tests/test_computenode_dispatch.py b/services/nodemanager/tests/test_computenode_dispatch.py
index ece186b..3a1c8ba 100644
--- a/services/nodemanager/tests/test_computenode_dispatch.py
+++ b/services/nodemanager/tests/test_computenode_dispatch.py
@@ -196,6 +196,16 @@ class ComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
self.assertTrue(self.timer.schedule.called)
self.assertEqual(300, self.timer.schedule.call_args[0][0])
+ def test_shutdown_window_close_scheduling(self):
+ self.make_actor()
+ self.shutdowns._set_state(False, 600)
+ self.timer.schedule.reset_mock()
+ self.node_actor.consider_shutdown().get(self.TIMEOUT)
+ self.stop_proxy(self.node_actor)
+ self.assertTrue(self.timer.schedule.called)
+ self.assertEqual(600, self.timer.schedule.call_args[0][0])
+ self.assertFalse(self.subscriber.called)
+
def test_shutdown_subscription(self):
self.make_actor()
self.shutdowns._set_state(True, 600)
@@ -207,41 +217,31 @@ class ComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
def test_shutdown_without_arvados_node(self):
self.make_actor()
self.shutdowns._set_state(True, 600)
- self.node_actor.consider_shutdown().get(self.TIMEOUT)
- self.assertTrue(self.subscriber.called)
+ self.assertTrue(self.node_actor.shutdown_eligible().get(self.TIMEOUT))
def test_no_shutdown_without_arvados_node_and_old_cloud_node(self):
self.make_actor(start_time=0)
self.shutdowns._set_state(True, 600)
- self.node_actor.consider_shutdown().get(self.TIMEOUT)
- self.assertFalse(self.subscriber.called)
+ self.assertFalse(self.node_actor.shutdown_eligible().get(self.TIMEOUT))
- def check_shutdown_rescheduled(self, window_open, next_window,
- schedule_time=None):
- self.shutdowns._set_state(window_open, next_window)
- self.timer.schedule.reset_mock()
- self.node_actor.consider_shutdown().get(self.TIMEOUT)
- self.stop_proxy(self.node_actor)
- self.assertTrue(self.timer.schedule.called)
- if schedule_time is not None:
- self.assertEqual(schedule_time, self.timer.schedule.call_args[0][0])
- self.assertFalse(self.subscriber.called)
-
- def test_shutdown_window_close_scheduling(self):
- self.make_actor()
- self.check_shutdown_rescheduled(False, 600, 600)
+ def test_no_shutdown_when_window_closed(self):
+ self.make_actor(3, testutil.arvados_node_mock(3, job_uuid=None))
+ self.assertFalse(self.node_actor.shutdown_eligible().get(self.TIMEOUT))
def test_no_shutdown_when_node_running_job(self):
self.make_actor(4, testutil.arvados_node_mock(4, job_uuid=True))
- self.check_shutdown_rescheduled(True, 600)
+ self.shutdowns._set_state(True, 600)
+ self.assertFalse(self.node_actor.shutdown_eligible().get(self.TIMEOUT))
def test_no_shutdown_when_node_state_unknown(self):
self.make_actor(5, testutil.arvados_node_mock(5, info={}))
- self.check_shutdown_rescheduled(True, 600)
+ self.shutdowns._set_state(True, 600)
+ self.assertFalse(self.node_actor.shutdown_eligible().get(self.TIMEOUT))
def test_no_shutdown_when_node_state_stale(self):
self.make_actor(6, testutil.arvados_node_mock(6, age=90000))
- self.check_shutdown_rescheduled(True, 600)
+ self.shutdowns._set_state(True, 600)
+ self.assertFalse(self.node_actor.shutdown_eligible().get(self.TIMEOUT))
def test_arvados_node_match(self):
self.make_actor(2)
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list