[ARVADOS] created: c8196e12b4ae8cc4215a5ed68bcab69d2c3d2f61
git at public.curoverse.com
git at public.curoverse.com
Fri Dec 5 17:45:53 EST 2014
at c8196e12b4ae8cc4215a5ed68bcab69d2c3d2f61 (commit)
commit c8196e12b4ae8cc4215a5ed68bcab69d2c3d2f61
Author: Brett Smith <brett at curoverse.com>
Date: Fri Dec 5 17:27:37 2014 -0500
4293: Node Manager shuts down nodes that fail to boot.
This helps Node Manager detect and correct when a node fails to
bootstrap.
diff --git a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
index ae0a65b..c79d8f9 100644
--- a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
+++ b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
@@ -136,12 +136,18 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
This actor simply destroys a cloud node, retrying as needed.
"""
def __init__(self, timer_actor, cloud_client, node_monitor,
- retry_wait=1, max_retry_wait=180):
+ cancellable=True, retry_wait=1, max_retry_wait=180):
+ # If a ShutdownActor is cancellable, it will ask the
+ # ComputeNodeMonitorActor if it's still eligible before taking each
+ # action, and stop the shutdown process if the node is no longer
+ # eligible. Normal shutdowns based on job demand should be
+ # cancellable; shutdowns based on node misbehavior should not.
super(ComputeNodeShutdownActor, self).__init__(
'arvnodeman.nodedown', timer_actor, retry_wait, max_retry_wait)
self._cloud = cloud_client
self._monitor = node_monitor.proxy()
self.cloud_node = self._monitor.cloud_node.get()
+ self.cancellable = cancellable
self.success = None
def on_start(self):
@@ -154,7 +160,8 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
def _stop_if_window_closed(orig_func):
@functools.wraps(orig_func)
def wrapper(self, *args, **kwargs):
- if not self._monitor.shutdown_eligible().get():
+ if (self.cancellable and
+ (not self._monitor.shutdown_eligible().get())):
self._logger.info(
"Cloud node %s shutdown cancelled - no longer eligible.",
self.cloud_node.id)
diff --git a/services/nodemanager/arvnodeman/config.py b/services/nodemanager/arvnodeman/config.py
index 079e623..f018015 100644
--- a/services/nodemanager/arvnodeman/config.py
+++ b/services/nodemanager/arvnodeman/config.py
@@ -6,6 +6,7 @@ import ConfigParser
import importlib
import logging
import ssl
+import sys
import arvados
import httplib2
@@ -42,6 +43,7 @@ class NodeManagerConfig(ConfigParser.SafeConfigParser):
'poll_time': '60',
'max_poll_time': '300',
'poll_stale_after': '600',
+ 'boot_fail_after': str(sys.maxint),
'node_stale_after': str(60 * 60 * 2)},
'Logging': {'file': '/dev/stderr',
'level': 'WARNING'},
diff --git a/services/nodemanager/arvnodeman/daemon.py b/services/nodemanager/arvnodeman/daemon.py
index 9f22568..d03d145 100644
--- a/services/nodemanager/arvnodeman/daemon.py
+++ b/services/nodemanager/arvnodeman/daemon.py
@@ -26,14 +26,16 @@ class _BaseNodeTracker(object):
self.nodes = {}
self.orphans = {}
- def __getitem__(self, key):
- return self.nodes[key]
-
- def __len__(self):
- return len(self.nodes)
+ # Proxy the methods listed below to self.nodes.
+ def _proxy_method(name):
+ method = getattr(dict, name)
+ @functools.wraps(method, ('__name__', '__doc__'))
+ def wrapper(self, *args, **kwargs):
+ return method(self.nodes, *args, **kwargs)
+ return wrapper
- def get(self, key, default=None):
- return self.nodes.get(key, default)
+ for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
+ locals()[_method_name] = _proxy_method(_method_name)
def record_key(self, record):
return self.item_key(getattr(record, self.RECORD_ATTR))
@@ -96,7 +98,9 @@ class NodeManagerDaemonActor(actor_class):
cloud_nodes_actor, cloud_update_actor, timer_actor,
arvados_factory, cloud_factory,
shutdown_windows, min_nodes, max_nodes,
- poll_stale_after=600, node_stale_after=7200,
+ poll_stale_after=600,
+ boot_fail_after=1800,
+ node_stale_after=7200,
node_setup_class=dispatch.ComputeNodeSetupActor,
node_shutdown_class=dispatch.ComputeNodeShutdownActor,
node_actor_class=dispatch.ComputeNodeMonitorActor):
@@ -115,6 +119,7 @@ class NodeManagerDaemonActor(actor_class):
self.min_nodes = min_nodes
self.max_nodes = max_nodes
self.poll_stale_after = poll_stale_after
+ self.boot_fail_after = boot_fail_after
self.node_stale_after = node_stale_after
self.last_polls = {}
for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
@@ -269,15 +274,15 @@ class NodeManagerDaemonActor(actor_class):
return pykka.get_all([getattr(actor, name) for name in attr_names])
def node_up(self, setup_proxy):
- cloud_node, arvados_node = self._get_actor_attrs(
- setup_proxy, 'cloud_node', 'arvados_node')
+ cloud_node = setup_proxy.cloud_node.get()
del self.booting[setup_proxy.actor_ref.actor_urn]
setup_proxy.stop()
record = self.cloud_nodes.get(cloud_node.id)
if record is None:
record = self._new_node(cloud_node)
self.booted[cloud_node.id] = record
- self._pair_nodes(record, arvados_node)
+ self._timer.schedule(time.time() + self.boot_fail_after,
+ self._later.shutdown_unpaired_node, cloud_node.id)
@_check_poll_freshness
def stop_booting_node(self):
@@ -292,19 +297,31 @@ class NodeManagerDaemonActor(actor_class):
self._later.stop_booting_node()
break
- @_check_poll_freshness
- def node_can_shutdown(self, node_actor):
- if self._nodes_excess() < 1:
- return None
+ def _begin_node_shutdown(self, node_actor, cancellable):
cloud_node_id = node_actor.cloud_node.get().id
if cloud_node_id in self.shutdowns:
return None
shutdown = self._node_shutdown.start(
timer_actor=self._timer, cloud_client=self._new_cloud(),
- node_monitor=node_actor.actor_ref).proxy()
+ node_monitor=node_actor.actor_ref, cancellable=cancellable).proxy()
self.shutdowns[cloud_node_id] = shutdown
shutdown.subscribe(self._later.node_finished_shutdown)
+ @_check_poll_freshness
+ def node_can_shutdown(self, node_actor):
+ if self._nodes_excess() > 0:
+ self._begin_node_shutdown(node_actor, cancellable=True)
+
+ def shutdown_unpaired_node(self, cloud_node_id):
+ for record_dict in [self.cloud_nodes, self.booted]:
+ if cloud_node_id in record_dict:
+ record = record_dict[cloud_node_id]
+ break
+ else:
+ return None
+ if record.arvados_node is None:
+ self._begin_node_shutdown(record.actor, cancellable=False)
+
def node_finished_shutdown(self, shutdown_actor):
success, cloud_node = self._get_actor_attrs(shutdown_actor, 'success',
'cloud_node')
diff --git a/services/nodemanager/arvnodeman/launcher.py b/services/nodemanager/arvnodeman/launcher.py
index 9f5e162..5fa404f 100644
--- a/services/nodemanager/arvnodeman/launcher.py
+++ b/services/nodemanager/arvnodeman/launcher.py
@@ -119,6 +119,7 @@ def main(args=None):
config.getint('Daemon', 'min_nodes'),
config.getint('Daemon', 'max_nodes'),
config.getint('Daemon', 'poll_stale_after'),
+ config.getint('Daemon', 'boot_fail_after'),
config.getint('Daemon', 'node_stale_after'),
node_setup, node_shutdown, node_monitor).proxy()
diff --git a/services/nodemanager/doc/ec2.example.cfg b/services/nodemanager/doc/ec2.example.cfg
index 0f9caca..024ed2b 100644
--- a/services/nodemanager/doc/ec2.example.cfg
+++ b/services/nodemanager/doc/ec2.example.cfg
@@ -27,6 +27,12 @@ max_poll_time = 300
# information is too outdated.
poll_stale_after = 600
+# If Node Manager boots a cloud node, and it does not pair with an Arvados
+# node before this long, assume that there was a cloud bootstrap failure and
+# shut it down. Note that normal shutdown windows apply (see the Cloud
+# section), so this should be shorter than the first shutdown window value.
+boot_fail_after = 1800
+
# "Node stale time" affects two related behaviors.
# 1. If a compute node has been running for at least this long, but it
# isn't paired with an Arvados node, do not shut it down, but leave it alone.
diff --git a/services/nodemanager/tests/test_computenode_dispatch.py b/services/nodemanager/tests/test_computenode_dispatch.py
index 7f6988d..c86dcfd 100644
--- a/services/nodemanager/tests/test_computenode_dispatch.py
+++ b/services/nodemanager/tests/test_computenode_dispatch.py
@@ -106,14 +106,14 @@ class ComputeNodeShutdownActorMixin(testutil.ActorTestMixin):
self.cloud_node = cloud_node
self.arvados_node = arvados_node
- def make_actor(self):
+ def make_actor(self, cancellable=True):
if not hasattr(self, 'timer'):
self.make_mocks()
monitor_actor = dispatch.ComputeNodeMonitorActor.start(
self.cloud_node, time.time(), self.shutdowns, self.timer,
self.updates, self.arvados_node)
self.shutdown_actor = self.ACTOR_CLASS.start(
- self.timer, self.cloud_client, monitor_actor).proxy()
+ self.timer, self.cloud_client, monitor_actor, cancellable).proxy()
self.monitor_actor = monitor_actor.proxy()
def check_success_flag(self, expected, allow_msg_count=1):
@@ -126,6 +126,15 @@ class ComputeNodeShutdownActorMixin(testutil.ActorTestMixin):
else:
self.fail("success flag {} is not {}".format(last_flag, expected))
+ def test_uncancellable_shutdown(self, *mocks):
+ self.make_mocks(shutdown_open=False)
+ self.cloud_client.destroy_node.return_value = False
+ self.make_actor(cancellable=False)
+ self.check_success_flag(None, 0)
+ self.shutdowns._set_state(True, 600)
+ self.cloud_client.destroy_node.return_value = True
+ self.check_success_flag(True)
+
class ComputeNodeShutdownActorTestCase(ComputeNodeShutdownActorMixin,
unittest.TestCase):
diff --git a/services/nodemanager/tests/test_daemon.py b/services/nodemanager/tests/test_daemon.py
index 31a682f..0f146a1 100644
--- a/services/nodemanager/tests/test_daemon.py
+++ b/services/nodemanager/tests/test_daemon.py
@@ -22,14 +22,14 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
self.cloud_factory = mock.MagicMock(name='cloud_mock')
self.cloud_factory().node_start_time.return_value = time.time()
self.cloud_updates = mock.MagicMock(name='updates_mock')
- self.timer = testutil.MockTimer()
+ self.timer = testutil.MockTimer(deliver_immediately=False)
self.node_setup = mock.MagicMock(name='setup_mock')
self.node_shutdown = mock.MagicMock(name='shutdown_mock')
self.daemon = nmdaemon.NodeManagerDaemonActor.start(
self.server_wishlist_poller, self.arvados_nodes_poller,
self.cloud_nodes_poller, self.cloud_updates, self.timer,
self.arv_factory, self.cloud_factory,
- [54, 5, 1], min_nodes, max_nodes, 600, 3600,
+ [54, 5, 1], min_nodes, max_nodes, 600, 1800, 3600,
self.node_setup, self.node_shutdown).proxy()
if cloud_nodes is not None:
self.daemon.update_cloud_nodes(cloud_nodes).get(self.TIMEOUT)
@@ -44,6 +44,12 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
def alive_monitor_count(self):
return sum(1 for actor in self.monitor_list() if actor.is_alive())
+ def assertShutdownCancellable(self, expected=True):
+ self.assertTrue(self.node_shutdown.start.called)
+ self.assertIs(expected,
+ self.node_shutdown.start.call_args[1]['cancellable'],
+ "ComputeNodeShutdownActor incorrectly cancellable")
+
def test_easy_node_creation(self):
size = testutil.MockSize(1)
self.make_daemon(want_sizes=[size])
@@ -195,8 +201,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
monitor = self.monitor_list()[0].proxy()
self.daemon.update_server_wishlist([])
self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
- self.assertTrue(self.node_shutdown.start.called,
- "daemon did not shut down booted node on offer")
+ self.assertShutdownCancellable(True)
shutdown = self.node_shutdown.start().proxy()
shutdown.cloud_node.get.return_value = cloud_node
self.daemon.node_finished_shutdown(shutdown).get(self.TIMEOUT)
@@ -210,6 +215,37 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
self.assertTrue(self.node_setup.start.called,
"second node not started after booted node stopped")
+ def test_booted_node_shut_down_when_never_listed(self):
+ setup = self.start_node_boot()
+ self.daemon.node_up(setup).get(self.TIMEOUT)
+ self.assertEqual(1, self.alive_monitor_count())
+ self.assertFalse(self.node_shutdown.start.called)
+ self.timer.deliver()
+ self.stop_proxy(self.daemon)
+ self.assertShutdownCancellable(False)
+
+ def test_booted_node_shut_down_when_never_paired(self):
+ cloud_node = testutil.cloud_node_mock(2)
+ setup = self.start_node_boot(cloud_node)
+ self.daemon.node_up(setup).get(self.TIMEOUT)
+ self.assertEqual(1, self.alive_monitor_count())
+ self.daemon.update_cloud_nodes([cloud_node])
+ self.timer.deliver()
+ self.stop_proxy(self.daemon)
+ self.assertShutdownCancellable(False)
+
+ def test_node_that_pairs_not_considered_failed_boot(self):
+ cloud_node = testutil.cloud_node_mock(3)
+ arv_node = testutil.arvados_node_mock(3)
+ setup = self.start_node_boot(cloud_node, arv_node)
+ self.daemon.node_up(setup).get(self.TIMEOUT)
+ self.assertEqual(1, self.alive_monitor_count())
+ self.daemon.update_cloud_nodes([cloud_node])
+ self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
+ self.timer.deliver()
+ self.stop_proxy(self.daemon)
+ self.assertFalse(self.node_shutdown.start.called)
+
def test_booting_nodes_shut_down(self):
self.make_daemon(want_sizes=[testutil.MockSize(1)])
self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
@@ -317,6 +353,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
self.assertTrue(new_node.stop_if_no_cloud_node.called)
self.daemon.node_up(new_node).get(self.TIMEOUT)
self.assertTrue(new_node.stop.called)
+ self.timer.deliver()
self.assertTrue(
self.daemon.actor_ref.actor_stopped.wait(self.TIMEOUT))
@@ -325,5 +362,6 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
self.make_daemon(want_sizes=[size])
self.daemon.shutdown().get(self.TIMEOUT)
self.daemon.update_server_wishlist([size] * 2).get(self.TIMEOUT)
+ self.timer.deliver()
self.stop_proxy(self.daemon)
self.assertEqual(1, self.node_setup.start.call_count)
diff --git a/services/nodemanager/tests/testutil.py b/services/nodemanager/tests/testutil.py
index 56f22c8..30808ac 100644
--- a/services/nodemanager/tests/testutil.py
+++ b/services/nodemanager/tests/testutil.py
@@ -2,6 +2,7 @@
from __future__ import absolute_import, print_function
+import threading
import time
import mock
@@ -62,8 +63,23 @@ class MockSize(object):
class MockTimer(object):
+ def __init__(self, deliver_immediately=True):
+ self.deliver_immediately = deliver_immediately
+ self.messages = []
+ self.lock = threading.Lock()
+
+ def deliver(self):
+ with self.lock:
+ to_deliver = self.messages
+ self.messages = []
+ for callback, args, kwargs in to_deliver:
+ callback(*args, **kwargs)
+
def schedule(self, want_time, callback, *args, **kwargs):
- return callback(*args, **kwargs)
+ with self.lock:
+ self.messages.append((callback, args, kwargs))
+ if self.deliver_immediately:
+ self.deliver()
class ActorTestMixin(object):
commit 5d86931bb58df839ed5b98d012d687065dfd256c
Author: Brett Smith <brett at curoverse.com>
Date: Fri Dec 5 17:45:13 2014 -0500
4380: Node Manager SLURM dispatcher proceeds from more states.
Per discussion with Ward. Our main concern is that Node Manager
shouldn't shut down nodes that are doing work. We feel comfortable
broadening the definition of "not doing work" to this set of states.
diff --git a/services/nodemanager/arvnodeman/computenode/dispatch/slurm.py b/services/nodemanager/arvnodeman/computenode/dispatch/slurm.py
index 27397e5..6eaa8b9 100644
--- a/services/nodemanager/arvnodeman/computenode/dispatch/slurm.py
+++ b/services/nodemanager/arvnodeman/computenode/dispatch/slurm.py
@@ -10,6 +10,8 @@ from . import \
from . import ComputeNodeShutdownActor as ShutdownActorBase
class ComputeNodeShutdownActor(ShutdownActorBase):
+ SLURM_END_STATES = frozenset(['down\n', 'down*\n', 'drain\n', 'fail\n'])
+
def on_start(self):
arv_node = self._monitor.arvados_node.get()
if arv_node is None:
@@ -42,7 +44,7 @@ class ComputeNodeShutdownActor(ShutdownActorBase):
def await_slurm_drain(self):
output = subprocess.check_output(
['sinfo', '--noheader', '-o', '%t', '-n', self._nodename])
- if output == 'drain\n':
+ if output in self.SLURM_END_STATES:
self._later.shutdown_node()
else:
self._timer.schedule(time.time() + 10,
diff --git a/services/nodemanager/tests/test_computenode_dispatch_slurm.py b/services/nodemanager/tests/test_computenode_dispatch_slurm.py
index ccac8b2..93cc60d 100644
--- a/services/nodemanager/tests/test_computenode_dispatch_slurm.py
+++ b/services/nodemanager/tests/test_computenode_dispatch_slurm.py
@@ -22,21 +22,31 @@ class SLURMComputeNodeShutdownActorTestCase(ComputeNodeShutdownActorMixin,
for s in args:
self.assertIn(s, slurm_cmd)
- def check_success_after_reset(self, proc_mock):
+ def check_success_after_reset(self, proc_mock, end_state='drain\n'):
self.make_mocks(arvados_node=testutil.arvados_node_mock(63))
self.make_actor()
self.check_success_flag(None, 0)
self.check_success_flag(None, 0)
# Order is critical here: if the mock gets called when no return value
# or side effect is set, we may invoke a real subprocess.
- proc_mock.return_value = 'drain\n'
+ proc_mock.return_value = end_state
proc_mock.side_effect = None
self.check_success_flag(True, 3)
self.check_slurm_got_args(proc_mock, 'compute63')
- def test_wait_for_drained_state(self, proc_mock):
- proc_mock.return_value = 'drng\n'
- self.check_success_after_reset(proc_mock)
+ def make_wait_state_test(start_state='drng\n', end_state='drain\n'):
+ def test(self, proc_mock):
+ proc_mock.return_value = start_state
+ self.check_success_after_reset(proc_mock, end_state)
+ return test
+
+ for wait_state in ['alloc\n', 'drng\n', 'idle*\n']:
+ locals()['test_wait_while_' + wait_state.strip()
+ ] = make_wait_state_test(start_state=wait_state)
+
+ for end_state in ['down\n', 'down*\n', 'drain\n', 'fail\n']:
+ locals()['test_wait_until_' + end_state.strip()
+ ] = make_wait_state_test(end_state=end_state)
def test_retry_failed_slurm_calls(self, proc_mock):
proc_mock.side_effect = subprocess.CalledProcessError(1, ["mock"])
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list