[ARVADOS] created: 241ef75ec8b6cf5dd14ce19fa068462adaeb0386

Git user git at public.curoverse.com
Wed Apr 6 15:36:16 EDT 2016


        at  241ef75ec8b6cf5dd14ce19fa068462adaeb0386 (commit)


commit 241ef75ec8b6cf5dd14ce19fa068462adaeb0386
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed Apr 6 11:22:31 2016 -0400

    8799: Nodes with slurm_state are "down" are checked with sinfo and either reenabled or are valid for shutdown.

diff --git a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
index 2ddfb0a..552ed01 100644
--- a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
+++ b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
@@ -219,7 +219,17 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
                 return orig_func(self, *args, **kwargs)
         return stop_wrapper
 
-    @ComputeNodeStateChangeBase._finish_on_exception
+    def _cancel_on_exception(orig_func):
+        @functools.wraps(orig_func)
+        def finish_wrapper(self, *args, **kwargs):
+            try:
+                return orig_func(self, *args, **kwargs)
+            except Exception as error:
+                self._logger.error("Actor error %s", error)
+                self._later.cancel_shutdown("Unhandled exception %s" % error)
+        return finish_wrapper
+
+    @_cancel_on_exception
     @_stop_if_window_closed
     @RetryMixin._retry()
     def shutdown_node(self):
@@ -387,6 +397,9 @@ class ComputeNodeMonitorActor(config.actor_class):
         else:
             return "node is not idle."
 
+    def resume_node(self):
+        pass
+
     def consider_shutdown(self):
         try:
             next_opening = self._shutdowns.next_opening()
diff --git a/services/nodemanager/arvnodeman/computenode/dispatch/slurm.py b/services/nodemanager/arvnodeman/computenode/dispatch/slurm.py
index 255e50a..9ef54b3 100644
--- a/services/nodemanager/arvnodeman/computenode/dispatch/slurm.py
+++ b/services/nodemanager/arvnodeman/computenode/dispatch/slurm.py
@@ -6,16 +6,28 @@ import subprocess
 import time
 
 from . import \
-    ComputeNodeSetupActor, ComputeNodeUpdateActor, ComputeNodeMonitorActor
+    ComputeNodeSetupActor, ComputeNodeUpdateActor
 from . import ComputeNodeShutdownActor as ShutdownActorBase
+from . import ComputeNodeMonitorActor as MonitorActorBase
 from .. import RetryMixin
 
-class ComputeNodeShutdownActor(ShutdownActorBase):
+class SlurmMixin(object):
     SLURM_END_STATES = frozenset(['down\n', 'down*\n',
                                   'drain\n', 'drain*\n',
                                   'fail\n', 'fail*\n'])
     SLURM_DRAIN_STATES = frozenset(['drain\n', 'drng\n'])
 
+    def _set_node_state(self, nodename, state, *args):
+        cmd = ['scontrol', 'update', 'NodeName=' + nodename,
+               'State=' + state]
+        cmd.extend(args)
+        subprocess.check_output(cmd)
+
+    def _get_slurm_state(self, nodename):
+        return subprocess.check_output(['sinfo', '--noheader', '-o', '%t', '-n', nodename])
+
+
+class ComputeNodeShutdownActor(SlurmMixin, ShutdownActorBase):
     def on_start(self):
         arv_node = self._arvados_node()
         if arv_node is None:
@@ -27,21 +39,12 @@ class ComputeNodeShutdownActor(ShutdownActorBase):
             self._logger.info("Draining SLURM node %s", self._nodename)
             self._later.issue_slurm_drain()
 
-    def _set_node_state(self, state, *args):
-        cmd = ['scontrol', 'update', 'NodeName=' + self._nodename,
-               'State=' + state]
-        cmd.extend(args)
-        subprocess.check_output(cmd)
-
-    def _get_slurm_state(self):
-        return subprocess.check_output(['sinfo', '--noheader', '-o', '%t', '-n', self._nodename])
-
     @RetryMixin._retry((subprocess.CalledProcessError,))
     def cancel_shutdown(self, reason):
         if self._nodename:
-            if self._get_slurm_state() in self.SLURM_DRAIN_STATES:
+            if self._get_slurm_state(self._nodename) in self.SLURM_DRAIN_STATES:
                 # Resume from "drng" or "drain"
-                self._set_node_state('RESUME')
+                self._set_node_state(self._nodename, 'RESUME')
             else:
                 # Node is in a state such as 'idle' or 'alloc' so don't
                 # try to resume it because that will just raise an error.
@@ -51,16 +54,36 @@ class ComputeNodeShutdownActor(ShutdownActorBase):
     @RetryMixin._retry((subprocess.CalledProcessError,))
     @ShutdownActorBase._stop_if_window_closed
     def issue_slurm_drain(self):
-        self._set_node_state('DRAIN', 'Reason=Node Manager shutdown')
+        self._set_node_state(self._nodename, 'DRAIN', 'Reason=Node Manager shutdown')
         self._logger.info("Waiting for SLURM node %s to drain", self._nodename)
         self._later.await_slurm_drain()
 
     @RetryMixin._retry((subprocess.CalledProcessError,))
     @ShutdownActorBase._stop_if_window_closed
     def await_slurm_drain(self):
-        output = self._get_slurm_state()
+        output = self._get_slurm_state(self._nodename)
         if output in self.SLURM_END_STATES:
             self._later.shutdown_node()
         else:
             self._timer.schedule(time.time() + 10,
                                  self._later.await_slurm_drain)
+
+
+class ComputeNodeMonitorActor(SlurmMixin, MonitorActorBase):
+
+    def shutdown_eligible(self):
+        if (self.arvados_node is not None and
+            self._get_slurm_state(self.arvados_node['hostname']) in self.SLURM_END_STATES):
+            return True
+        else:
+            return super(ComputeNodeMonitorActor, self).shutdown_eligible()
+
+    def resume_node(self):
+        try:
+            if (self.arvados_node is not None and
+                self._get_slurm_state(self.arvados_node['hostname']) in self.SLURM_DRAIN_STATES):
+                # Resume from "drng" or "drain"
+                self._set_node_state(self.arvados_node['hostname'], 'RESUME')
+        except Exception as error:
+            self._logger.warn(
+                "Exception reenabling node: %s", error, exc_info=error)
diff --git a/services/nodemanager/arvnodeman/daemon.py b/services/nodemanager/arvnodeman/daemon.py
index 7976f21..2c2ebe8 100644
--- a/services/nodemanager/arvnodeman/daemon.py
+++ b/services/nodemanager/arvnodeman/daemon.py
@@ -151,6 +151,9 @@ class NodeManagerDaemonActor(actor_class):
     def _update_poll_time(self, poll_key):
         self.last_polls[poll_key] = time.time()
 
+    def _resume_node(self, node_record):
+        node_record.actor.resume_node()
+
     def _pair_nodes(self, node_record, arvados_node):
         self._logger.info("Cloud node %s is now paired with Arvados node %s",
                           node_record.cloud_node.name, arvados_node['uuid'])
@@ -218,6 +221,11 @@ class NodeManagerDaemonActor(actor_class):
                 if cloud_rec.actor.offer_arvados_pair(arv_node).get():
                     self._pair_nodes(cloud_rec, arv_node)
                     break
+        for rec in self.arvados_nodes.nodes.itervalues():
+            if (rec.arvados_node["info"].get("slurm_state") == "down" and
+                rec.cloud_node is not None and
+                rec.cloud_node.id not in self.shutdowns):
+                self._resume_node(rec)
 
     def _nodes_booting(self, size):
         s = sum(1
@@ -238,8 +246,15 @@ class NodeManagerDaemonActor(actor_class):
                   for c in self.cloud_nodes.nodes.itervalues()
                   if size is None or c.cloud_node.size.id == size.id)
 
+    def _nodes_down(self, size):
+        return sum(1 for down in
+                   pykka.get_all(rec.actor.in_state('down') for rec in
+                                 self.cloud_nodes.nodes.itervalues()
+                                 if size is None or rec.cloud_node.size.id == size.id)
+                   if down)
+
     def _nodes_up(self, size):
-        up = self._nodes_booting(size) + self._nodes_booted(size)
+        up = (self._nodes_booting(size) + self._nodes_booted(size)) - self._nodes_down(size)
         return up
 
     def _total_price(self):
diff --git a/services/nodemanager/tests/test_computenode_dispatch_slurm.py b/services/nodemanager/tests/test_computenode_dispatch_slurm.py
index 8648783..212bb3d 100644
--- a/services/nodemanager/tests/test_computenode_dispatch_slurm.py
+++ b/services/nodemanager/tests/test_computenode_dispatch_slurm.py
@@ -3,6 +3,7 @@
 from __future__ import absolute_import, print_function
 
 import subprocess
+import time
 import unittest
 
 import mock
@@ -87,3 +88,75 @@ class SLURMComputeNodeShutdownActorTestCase(ComputeNodeShutdownActorMixin,
         proc_mock.return_value = 'drain\n'
         super(SLURMComputeNodeShutdownActorTestCase,
               self).test_arvados_node_cleaned_after_shutdown()
+
+class SLURMComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
+                                      unittest.TestCase):
+
+    def make_mocks(self, node_num):
+        self.shutdowns = testutil.MockShutdownTimer()
+        self.shutdowns._set_state(False, 300)
+        self.timer = mock.MagicMock(name='timer_mock')
+        self.updates = mock.MagicMock(name='update_mock')
+        self.cloud_mock = testutil.cloud_node_mock(node_num)
+        self.subscriber = mock.Mock(name='subscriber_mock')
+        self.cloud_client = mock.MagicMock(name='cloud_client')
+        self.cloud_client.broken.return_value = False
+
+    def make_actor(self, node_num=1, arv_node=None, start_time=None):
+        if not hasattr(self, 'cloud_mock'):
+            self.make_mocks(node_num)
+        if start_time is None:
+            start_time = time.time()
+        self.node_actor = slurm_dispatch.ComputeNodeMonitorActor.start(
+            self.cloud_mock, start_time, self.shutdowns,
+            testutil.cloud_node_fqdn, self.timer, self.updates, self.cloud_client,
+            arv_node, boot_fail_after=300).proxy()
+        self.node_actor.subscribe(self.subscriber).get(self.TIMEOUT)
+
+    @mock.patch("subprocess.check_output")
+    def test_resume_node(self, check_output):
+        arv_node = testutil.arvados_node_mock()
+        self.make_actor(arv_node=arv_node)
+        check_output.return_value = "drain\n"
+        self.node_actor.resume_node().get(self.TIMEOUT)
+        check_output.assert_has_calls([mock.call(['sinfo', '--noheader', '-o', '%t', '-n', arv_node['hostname']]),
+                                       mock.call(['sinfo', '--noheader', '-o', '%t', '-n', arv_node['hostname']]),
+                                       mock.call(['sinfo', '--noheader', '-o', '%t', '-n', arv_node['hostname']]),
+                                       mock.call(['scontrol', 'update', 'NodeName=' + arv_node['hostname'], 'State=RESUME'])],
+                                      any_order=True)
+        self.assertEqual(4, check_output.call_count)
+
+    @mock.patch("subprocess.check_output")
+    def test_no_resume_idle_node(self, check_output):
+        arv_node = testutil.arvados_node_mock()
+        self.make_actor(arv_node=arv_node)
+        check_output.return_value = "idle\n"
+        self.node_actor.resume_node().get(self.TIMEOUT)
+        check_output.assert_has_calls([mock.call(['sinfo', '--noheader', '-o', '%t', '-n', arv_node['hostname']]),
+                                       mock.call(['sinfo', '--noheader', '-o', '%t', '-n', arv_node['hostname']]),
+                                       mock.call(['sinfo', '--noheader', '-o', '%t', '-n', arv_node['hostname']])],
+                                      any_order=True)
+        self.assertEqual(3, check_output.call_count)
+
+
+    @mock.patch("subprocess.check_output")
+    def test_resume_node_exception(self, check_output):
+        arv_node = testutil.arvados_node_mock()
+        self.make_actor(arv_node=arv_node)
+        check_output.side_effect = Exception()
+        check_output.return_value = "drain\n"
+        self.node_actor.resume_node().get(self.TIMEOUT)
+        check_output.assert_has_calls([mock.call(['sinfo', '--noheader', '-o', '%t', '-n', arv_node['hostname']]),
+                                       mock.call(['sinfo', '--noheader', '-o', '%t', '-n', arv_node['hostname']]),
+                                       mock.call(['sinfo', '--noheader', '-o', '%t', '-n', arv_node['hostname']])],
+                                      any_order=True)
+        self.assertEqual(3, check_output.call_count)
+
+
+    @mock.patch("subprocess.check_output")
+    def test_shutdown_down_node(self, check_output):
+        check_output.return_value = "down\n"
+        self.make_actor()
+        self.assertEquals('shutdown window is not open.', self.node_actor.shutdown_eligible().get(self.TIMEOUT))
+        self.shutdowns._set_state(True, 600)
+        self.assertTrue(self.node_actor.shutdown_eligible().get(self.TIMEOUT))
diff --git a/services/nodemanager/tests/test_daemon.py b/services/nodemanager/tests/test_daemon.py
index 2daca08..00e05a1 100644
--- a/services/nodemanager/tests/test_daemon.py
+++ b/services/nodemanager/tests/test_daemon.py
@@ -279,7 +279,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.last_setup.arvados_node.get.return_value = arv_node
         return self.last_setup
 
-    def test_no_new_node_when_booted_node_not_usable(self):
+    def test_new_node_when_booted_node_not_usable(self):
         cloud_node = testutil.cloud_node_mock(4)
         arv_node = testutil.arvados_node_mock(4, crunch_worker_state='down')
         setup = self.start_node_boot(cloud_node, arv_node)
@@ -290,7 +290,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.daemon.update_server_wishlist(
             [testutil.MockSize(1)]).get(self.TIMEOUT)
         self.stop_proxy(self.daemon)
-        self.assertEqual(1, self.node_setup.start.call_count)
+        self.assertEqual(2, self.node_setup.start.call_count)
 
     def test_no_duplication_when_booting_node_listed_fast(self):
         # Test that we don't start two ComputeNodeMonitorActors when
@@ -718,3 +718,25 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         # test for that.
         self.assertEqual(2, sizecounts[small.id])
         self.assertEqual(1, sizecounts[big.id])
+
+    @mock.patch("arvnodeman.daemon.NodeManagerDaemonActor._resume_node")
+    def test_resume_drained_nodes(self, resume_node):
+        cloud_node = testutil.cloud_node_mock(1)
+        arv_node = testutil.arvados_node_mock(1, info={"ec2_instance_id": "1", "slurm_state": "down"})
+        self.make_daemon([cloud_node], [arv_node])
+        resume_node.assert_called_with(self.daemon.cloud_nodes.get(self.TIMEOUT).nodes.values()[0])
+        self.stop_proxy(self.daemon)
+
+    @mock.patch("arvnodeman.daemon.NodeManagerDaemonActor._resume_node")
+    def test_no_resume_shutdown_nodes(self, resume_node):
+        cloud_node = testutil.cloud_node_mock(1)
+        arv_node = testutil.arvados_node_mock(1, info={"ec2_instance_id": "1", "slurm_state": "down"})
+
+        self.make_daemon([cloud_node], [])
+
+        self.node_shutdown = mock.MagicMock(name='shutdown_mock')
+        self.daemon.shutdowns.get(self.TIMEOUT)[cloud_node.id] = self.node_shutdown
+
+        self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
+        self.stop_proxy(self.daemon)
+        resume_node.assert_not_called()

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list