[ARVADOS] created: 241ef75ec8b6cf5dd14ce19fa068462adaeb0386
Git user
git at public.curoverse.com
Wed Apr 6 15:36:16 EDT 2016
at 241ef75ec8b6cf5dd14ce19fa068462adaeb0386 (commit)
commit 241ef75ec8b6cf5dd14ce19fa068462adaeb0386
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed Apr 6 11:22:31 2016 -0400
8799: Nodes with slurm_state are "down" are checked with sinfo and either reenabled or are valid for shutdown.
diff --git a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
index 2ddfb0a..552ed01 100644
--- a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
+++ b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
@@ -219,7 +219,17 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
return orig_func(self, *args, **kwargs)
return stop_wrapper
- @ComputeNodeStateChangeBase._finish_on_exception
+ def _cancel_on_exception(orig_func):
+ @functools.wraps(orig_func)
+ def finish_wrapper(self, *args, **kwargs):
+ try:
+ return orig_func(self, *args, **kwargs)
+ except Exception as error:
+ self._logger.error("Actor error %s", error)
+ self._later.cancel_shutdown("Unhandled exception %s" % error)
+ return finish_wrapper
+
+ @_cancel_on_exception
@_stop_if_window_closed
@RetryMixin._retry()
def shutdown_node(self):
@@ -387,6 +397,9 @@ class ComputeNodeMonitorActor(config.actor_class):
else:
return "node is not idle."
+ def resume_node(self):
+ pass
+
def consider_shutdown(self):
try:
next_opening = self._shutdowns.next_opening()
diff --git a/services/nodemanager/arvnodeman/computenode/dispatch/slurm.py b/services/nodemanager/arvnodeman/computenode/dispatch/slurm.py
index 255e50a..9ef54b3 100644
--- a/services/nodemanager/arvnodeman/computenode/dispatch/slurm.py
+++ b/services/nodemanager/arvnodeman/computenode/dispatch/slurm.py
@@ -6,16 +6,28 @@ import subprocess
import time
from . import \
- ComputeNodeSetupActor, ComputeNodeUpdateActor, ComputeNodeMonitorActor
+ ComputeNodeSetupActor, ComputeNodeUpdateActor
from . import ComputeNodeShutdownActor as ShutdownActorBase
+from . import ComputeNodeMonitorActor as MonitorActorBase
from .. import RetryMixin
-class ComputeNodeShutdownActor(ShutdownActorBase):
+class SlurmMixin(object):
SLURM_END_STATES = frozenset(['down\n', 'down*\n',
'drain\n', 'drain*\n',
'fail\n', 'fail*\n'])
SLURM_DRAIN_STATES = frozenset(['drain\n', 'drng\n'])
+ def _set_node_state(self, nodename, state, *args):
+ cmd = ['scontrol', 'update', 'NodeName=' + nodename,
+ 'State=' + state]
+ cmd.extend(args)
+ subprocess.check_output(cmd)
+
+ def _get_slurm_state(self, nodename):
+ return subprocess.check_output(['sinfo', '--noheader', '-o', '%t', '-n', nodename])
+
+
+class ComputeNodeShutdownActor(SlurmMixin, ShutdownActorBase):
def on_start(self):
arv_node = self._arvados_node()
if arv_node is None:
@@ -27,21 +39,12 @@ class ComputeNodeShutdownActor(ShutdownActorBase):
self._logger.info("Draining SLURM node %s", self._nodename)
self._later.issue_slurm_drain()
- def _set_node_state(self, state, *args):
- cmd = ['scontrol', 'update', 'NodeName=' + self._nodename,
- 'State=' + state]
- cmd.extend(args)
- subprocess.check_output(cmd)
-
- def _get_slurm_state(self):
- return subprocess.check_output(['sinfo', '--noheader', '-o', '%t', '-n', self._nodename])
-
@RetryMixin._retry((subprocess.CalledProcessError,))
def cancel_shutdown(self, reason):
if self._nodename:
- if self._get_slurm_state() in self.SLURM_DRAIN_STATES:
+ if self._get_slurm_state(self._nodename) in self.SLURM_DRAIN_STATES:
# Resume from "drng" or "drain"
- self._set_node_state('RESUME')
+ self._set_node_state(self._nodename, 'RESUME')
else:
# Node is in a state such as 'idle' or 'alloc' so don't
# try to resume it because that will just raise an error.
@@ -51,16 +54,36 @@ class ComputeNodeShutdownActor(ShutdownActorBase):
@RetryMixin._retry((subprocess.CalledProcessError,))
@ShutdownActorBase._stop_if_window_closed
def issue_slurm_drain(self):
- self._set_node_state('DRAIN', 'Reason=Node Manager shutdown')
+ self._set_node_state(self._nodename, 'DRAIN', 'Reason=Node Manager shutdown')
self._logger.info("Waiting for SLURM node %s to drain", self._nodename)
self._later.await_slurm_drain()
@RetryMixin._retry((subprocess.CalledProcessError,))
@ShutdownActorBase._stop_if_window_closed
def await_slurm_drain(self):
- output = self._get_slurm_state()
+ output = self._get_slurm_state(self._nodename)
if output in self.SLURM_END_STATES:
self._later.shutdown_node()
else:
self._timer.schedule(time.time() + 10,
self._later.await_slurm_drain)
+
+
+class ComputeNodeMonitorActor(SlurmMixin, MonitorActorBase):
+
+ def shutdown_eligible(self):
+ if (self.arvados_node is not None and
+ self._get_slurm_state(self.arvados_node['hostname']) in self.SLURM_END_STATES):
+ return True
+ else:
+ return super(ComputeNodeMonitorActor, self).shutdown_eligible()
+
+ def resume_node(self):
+ try:
+ if (self.arvados_node is not None and
+ self._get_slurm_state(self.arvados_node['hostname']) in self.SLURM_DRAIN_STATES):
+ # Resume from "drng" or "drain"
+ self._set_node_state(self.arvados_node['hostname'], 'RESUME')
+ except Exception as error:
+ self._logger.warn(
+ "Exception reenabling node: %s", error, exc_info=error)
diff --git a/services/nodemanager/arvnodeman/daemon.py b/services/nodemanager/arvnodeman/daemon.py
index 7976f21..2c2ebe8 100644
--- a/services/nodemanager/arvnodeman/daemon.py
+++ b/services/nodemanager/arvnodeman/daemon.py
@@ -151,6 +151,9 @@ class NodeManagerDaemonActor(actor_class):
def _update_poll_time(self, poll_key):
self.last_polls[poll_key] = time.time()
+ def _resume_node(self, node_record):
+ node_record.actor.resume_node()
+
def _pair_nodes(self, node_record, arvados_node):
self._logger.info("Cloud node %s is now paired with Arvados node %s",
node_record.cloud_node.name, arvados_node['uuid'])
@@ -218,6 +221,11 @@ class NodeManagerDaemonActor(actor_class):
if cloud_rec.actor.offer_arvados_pair(arv_node).get():
self._pair_nodes(cloud_rec, arv_node)
break
+ for rec in self.arvados_nodes.nodes.itervalues():
+ if (rec.arvados_node["info"].get("slurm_state") == "down" and
+ rec.cloud_node is not None and
+ rec.cloud_node.id not in self.shutdowns):
+ self._resume_node(rec)
def _nodes_booting(self, size):
s = sum(1
@@ -238,8 +246,15 @@ class NodeManagerDaemonActor(actor_class):
for c in self.cloud_nodes.nodes.itervalues()
if size is None or c.cloud_node.size.id == size.id)
+ def _nodes_down(self, size):
+ return sum(1 for down in
+ pykka.get_all(rec.actor.in_state('down') for rec in
+ self.cloud_nodes.nodes.itervalues()
+ if size is None or rec.cloud_node.size.id == size.id)
+ if down)
+
def _nodes_up(self, size):
- up = self._nodes_booting(size) + self._nodes_booted(size)
+ up = (self._nodes_booting(size) + self._nodes_booted(size)) - self._nodes_down(size)
return up
def _total_price(self):
diff --git a/services/nodemanager/tests/test_computenode_dispatch_slurm.py b/services/nodemanager/tests/test_computenode_dispatch_slurm.py
index 8648783..212bb3d 100644
--- a/services/nodemanager/tests/test_computenode_dispatch_slurm.py
+++ b/services/nodemanager/tests/test_computenode_dispatch_slurm.py
@@ -3,6 +3,7 @@
from __future__ import absolute_import, print_function
import subprocess
+import time
import unittest
import mock
@@ -87,3 +88,75 @@ class SLURMComputeNodeShutdownActorTestCase(ComputeNodeShutdownActorMixin,
proc_mock.return_value = 'drain\n'
super(SLURMComputeNodeShutdownActorTestCase,
self).test_arvados_node_cleaned_after_shutdown()
+
+class SLURMComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
+ unittest.TestCase):
+
+ def make_mocks(self, node_num):
+ self.shutdowns = testutil.MockShutdownTimer()
+ self.shutdowns._set_state(False, 300)
+ self.timer = mock.MagicMock(name='timer_mock')
+ self.updates = mock.MagicMock(name='update_mock')
+ self.cloud_mock = testutil.cloud_node_mock(node_num)
+ self.subscriber = mock.Mock(name='subscriber_mock')
+ self.cloud_client = mock.MagicMock(name='cloud_client')
+ self.cloud_client.broken.return_value = False
+
+ def make_actor(self, node_num=1, arv_node=None, start_time=None):
+ if not hasattr(self, 'cloud_mock'):
+ self.make_mocks(node_num)
+ if start_time is None:
+ start_time = time.time()
+ self.node_actor = slurm_dispatch.ComputeNodeMonitorActor.start(
+ self.cloud_mock, start_time, self.shutdowns,
+ testutil.cloud_node_fqdn, self.timer, self.updates, self.cloud_client,
+ arv_node, boot_fail_after=300).proxy()
+ self.node_actor.subscribe(self.subscriber).get(self.TIMEOUT)
+
+ @mock.patch("subprocess.check_output")
+ def test_resume_node(self, check_output):
+ arv_node = testutil.arvados_node_mock()
+ self.make_actor(arv_node=arv_node)
+ check_output.return_value = "drain\n"
+ self.node_actor.resume_node().get(self.TIMEOUT)
+ check_output.assert_has_calls([mock.call(['sinfo', '--noheader', '-o', '%t', '-n', arv_node['hostname']]),
+ mock.call(['sinfo', '--noheader', '-o', '%t', '-n', arv_node['hostname']]),
+ mock.call(['sinfo', '--noheader', '-o', '%t', '-n', arv_node['hostname']]),
+ mock.call(['scontrol', 'update', 'NodeName=' + arv_node['hostname'], 'State=RESUME'])],
+ any_order=True)
+ self.assertEqual(4, check_output.call_count)
+
+ @mock.patch("subprocess.check_output")
+ def test_no_resume_idle_node(self, check_output):
+ arv_node = testutil.arvados_node_mock()
+ self.make_actor(arv_node=arv_node)
+ check_output.return_value = "idle\n"
+ self.node_actor.resume_node().get(self.TIMEOUT)
+ check_output.assert_has_calls([mock.call(['sinfo', '--noheader', '-o', '%t', '-n', arv_node['hostname']]),
+ mock.call(['sinfo', '--noheader', '-o', '%t', '-n', arv_node['hostname']]),
+ mock.call(['sinfo', '--noheader', '-o', '%t', '-n', arv_node['hostname']])],
+ any_order=True)
+ self.assertEqual(3, check_output.call_count)
+
+
+ @mock.patch("subprocess.check_output")
+ def test_resume_node_exception(self, check_output):
+ arv_node = testutil.arvados_node_mock()
+ self.make_actor(arv_node=arv_node)
+ check_output.side_effect = Exception()
+ check_output.return_value = "drain\n"
+ self.node_actor.resume_node().get(self.TIMEOUT)
+ check_output.assert_has_calls([mock.call(['sinfo', '--noheader', '-o', '%t', '-n', arv_node['hostname']]),
+ mock.call(['sinfo', '--noheader', '-o', '%t', '-n', arv_node['hostname']]),
+ mock.call(['sinfo', '--noheader', '-o', '%t', '-n', arv_node['hostname']])],
+ any_order=True)
+ self.assertEqual(3, check_output.call_count)
+
+
+ @mock.patch("subprocess.check_output")
+ def test_shutdown_down_node(self, check_output):
+ check_output.return_value = "down\n"
+ self.make_actor()
+ self.assertEquals('shutdown window is not open.', self.node_actor.shutdown_eligible().get(self.TIMEOUT))
+ self.shutdowns._set_state(True, 600)
+ self.assertTrue(self.node_actor.shutdown_eligible().get(self.TIMEOUT))
diff --git a/services/nodemanager/tests/test_daemon.py b/services/nodemanager/tests/test_daemon.py
index 2daca08..00e05a1 100644
--- a/services/nodemanager/tests/test_daemon.py
+++ b/services/nodemanager/tests/test_daemon.py
@@ -279,7 +279,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
self.last_setup.arvados_node.get.return_value = arv_node
return self.last_setup
- def test_no_new_node_when_booted_node_not_usable(self):
+ def test_new_node_when_booted_node_not_usable(self):
cloud_node = testutil.cloud_node_mock(4)
arv_node = testutil.arvados_node_mock(4, crunch_worker_state='down')
setup = self.start_node_boot(cloud_node, arv_node)
@@ -290,7 +290,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
self.daemon.update_server_wishlist(
[testutil.MockSize(1)]).get(self.TIMEOUT)
self.stop_proxy(self.daemon)
- self.assertEqual(1, self.node_setup.start.call_count)
+ self.assertEqual(2, self.node_setup.start.call_count)
def test_no_duplication_when_booting_node_listed_fast(self):
# Test that we don't start two ComputeNodeMonitorActors when
@@ -718,3 +718,25 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
# test for that.
self.assertEqual(2, sizecounts[small.id])
self.assertEqual(1, sizecounts[big.id])
+
+ @mock.patch("arvnodeman.daemon.NodeManagerDaemonActor._resume_node")
+ def test_resume_drained_nodes(self, resume_node):
+ cloud_node = testutil.cloud_node_mock(1)
+ arv_node = testutil.arvados_node_mock(1, info={"ec2_instance_id": "1", "slurm_state": "down"})
+ self.make_daemon([cloud_node], [arv_node])
+ resume_node.assert_called_with(self.daemon.cloud_nodes.get(self.TIMEOUT).nodes.values()[0])
+ self.stop_proxy(self.daemon)
+
+ @mock.patch("arvnodeman.daemon.NodeManagerDaemonActor._resume_node")
+ def test_no_resume_shutdown_nodes(self, resume_node):
+ cloud_node = testutil.cloud_node_mock(1)
+ arv_node = testutil.arvados_node_mock(1, info={"ec2_instance_id": "1", "slurm_state": "down"})
+
+ self.make_daemon([cloud_node], [])
+
+ self.node_shutdown = mock.MagicMock(name='shutdown_mock')
+ self.daemon.shutdowns.get(self.TIMEOUT)[cloud_node.id] = self.node_shutdown
+
+ self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
+ self.stop_proxy(self.daemon)
+ resume_node.assert_not_called()
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list