[ARVADOS] updated: 6c68141eb50255128cf38b5717b15b16f2a8cdff

git at public.curoverse.com git at public.curoverse.com
Tue Nov 18 15:04:29 EST 2014


Summary of changes:
 .../arvnodeman/computenode/dispatch/__init__.py    |  65 +++++++++----
 .../arvnodeman/computenode/dispatch/slurm.py       |  49 ++++++++++
 services/nodemanager/arvnodeman/config.py          |  11 +++
 services/nodemanager/arvnodeman/daemon.py          |  49 ++++++----
 services/nodemanager/arvnodeman/launcher.py        |   9 +-
 services/nodemanager/doc/ec2.example.cfg           |   5 +
 .../nodemanager/tests/test_computenode_dispatch.py | 103 ++++++++++++++-------
 .../tests/test_computenode_dispatch_slurm.py       |  58 ++++++++++++
 services/nodemanager/tests/test_config.py          |  18 ++++
 services/nodemanager/tests/test_daemon.py          |  51 ++++++++++
 services/nodemanager/tests/testutil.py             |   6 ++
 11 files changed, 349 insertions(+), 75 deletions(-)
 create mode 100644 services/nodemanager/arvnodeman/computenode/dispatch/slurm.py
 create mode 100644 services/nodemanager/tests/test_computenode_dispatch_slurm.py

       via  6c68141eb50255128cf38b5717b15b16f2a8cdff (commit)
       via  ffc7c1b20530dda13c1adf3aa48122ac881bbc29 (commit)
       via  6874da0dd56d6c0320880dfb3bd4da34a3c0a7d3 (commit)
       via  4af4a2cf12d3b87a4fdbd09115bc258961b706bb (commit)
       via  22381d218295075e63c06f0b60bcb24cee7b30b6 (commit)
      from  1a2a05a4714a4bc538800d08d353b0306d5e49ad (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 6c68141eb50255128cf38b5717b15b16f2a8cdff
Merge: 1a2a05a ffc7c1b
Author: Brett Smith <brett at curoverse.com>
Date:   Tue Nov 18 15:04:13 2014 -0500

    Merge branch '4380-node-manager-slurm-drain-wip'
    
    Closes #4380, #4519.


commit ffc7c1b20530dda13c1adf3aa48122ac881bbc29
Author: Brett Smith <brett at curoverse.com>
Date:   Thu Nov 13 15:34:54 2014 -0500

    4380: Add SLURM dispatcher to Node Manager.

diff --git a/services/nodemanager/arvnodeman/computenode/dispatch/slurm.py b/services/nodemanager/arvnodeman/computenode/dispatch/slurm.py
new file mode 100644
index 0000000..27397e5
--- /dev/null
+++ b/services/nodemanager/arvnodeman/computenode/dispatch/slurm.py
@@ -0,0 +1,49 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+import subprocess
+import time
+
+from . import \
+    ComputeNodeSetupActor, ComputeNodeUpdateActor, ComputeNodeMonitorActor
+from . import ComputeNodeShutdownActor as ShutdownActorBase
+
+class ComputeNodeShutdownActor(ShutdownActorBase):
+    def on_start(self):
+        arv_node = self._monitor.arvados_node.get()
+        if arv_node is None:
+            return super(ComputeNodeShutdownActor, self).on_start()
+        else:
+            self._nodename = arv_node['hostname']
+            self._logger.info("Draining SLURM node %s", self._nodename)
+            self._later.issue_slurm_drain()
+
+    def _set_node_state(self, state, *args):
+        cmd = ['scontrol', 'update', 'NodeName=' + self._nodename,
+               'State=' + state]
+        cmd.extend(args)
+        subprocess.check_output(cmd)
+
+    @ShutdownActorBase._retry((subprocess.CalledProcessError,))
+    def cancel_shutdown(self):
+        self._set_node_state('RESUME')
+        return super(ComputeNodeShutdownActor, self).cancel_shutdown()
+
+    @ShutdownActorBase._stop_if_window_closed
+    @ShutdownActorBase._retry((subprocess.CalledProcessError,))
+    def issue_slurm_drain(self):
+        self._set_node_state('DRAIN', 'Reason=Node Manager shutdown')
+        self._logger.info("Waiting for SLURM node %s to drain", self._nodename)
+        self._later.await_slurm_drain()
+
+    @ShutdownActorBase._stop_if_window_closed
+    @ShutdownActorBase._retry((subprocess.CalledProcessError,))
+    def await_slurm_drain(self):
+        output = subprocess.check_output(
+            ['sinfo', '--noheader', '-o', '%t', '-n', self._nodename])
+        if output == 'drain\n':
+            self._later.shutdown_node()
+        else:
+            self._timer.schedule(time.time() + 10,
+                                 self._later.await_slurm_drain)
diff --git a/services/nodemanager/arvnodeman/config.py b/services/nodemanager/arvnodeman/config.py
index 24fd828..079e623 100644
--- a/services/nodemanager/arvnodeman/config.py
+++ b/services/nodemanager/arvnodeman/config.py
@@ -68,6 +68,17 @@ class NodeManagerConfig(ConfigParser.SafeConfigParser):
                 for key in self.options('Logging')
                 if key not in self.LOGGING_NONLEVELS}
 
+    def dispatch_classes(self):
+        mod_name = 'arvnodeman.computenode.dispatch'
+        if self.has_option('Daemon', 'dispatcher'):
+            mod_name = '{}.{}'.format(mod_name,
+                                      self.get('Daemon', 'dispatcher'))
+        module = importlib.import_module(mod_name)
+        return (module.ComputeNodeSetupActor,
+                module.ComputeNodeShutdownActor,
+                module.ComputeNodeUpdateActor,
+                module.ComputeNodeMonitorActor)
+
     def new_arvados_client(self):
         if self.has_option('Daemon', 'certs_file'):
             certs_file = self.get('Daemon', 'certs_file')
diff --git a/services/nodemanager/arvnodeman/launcher.py b/services/nodemanager/arvnodeman/launcher.py
index d2f4afe..9f5e162 100644
--- a/services/nodemanager/arvnodeman/launcher.py
+++ b/services/nodemanager/arvnodeman/launcher.py
@@ -12,7 +12,6 @@ import daemon
 import pykka
 
 from . import config as nmconfig
-from .computenode.dispatch import ComputeNodeUpdateActor
 from .daemon import NodeManagerDaemonActor
 from .jobqueue import JobQueueMonitorActor, ServerCalculator
 from .nodelist import ArvadosNodeListMonitorActor, CloudNodeListMonitorActor
@@ -107,10 +106,11 @@ def main(args=None):
         signal.signal(sigcode, shutdown_signal)
 
     setup_logging(config.get('Logging', 'file'), **config.log_levels())
+    node_setup, node_shutdown, node_update, node_monitor = \
+        config.dispatch_classes()
     timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
         launch_pollers(config)
-    cloud_node_updater = ComputeNodeUpdateActor.start(
-        config.new_cloud_client).proxy()
+    cloud_node_updater = node_update.start(config.new_cloud_client).proxy()
     node_daemon = NodeManagerDaemonActor.start(
         job_queue_poller, arvados_node_poller, cloud_node_poller,
         cloud_node_updater, timer,
@@ -119,7 +119,8 @@ def main(args=None):
         config.getint('Daemon', 'min_nodes'),
         config.getint('Daemon', 'max_nodes'),
         config.getint('Daemon', 'poll_stale_after'),
-        config.getint('Daemon', 'node_stale_after')).proxy()
+        config.getint('Daemon', 'node_stale_after'),
+        node_setup, node_shutdown, node_monitor).proxy()
 
     signal.pause()
     daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
diff --git a/services/nodemanager/doc/ec2.example.cfg b/services/nodemanager/doc/ec2.example.cfg
index f4b27af..0f9caca 100644
--- a/services/nodemanager/doc/ec2.example.cfg
+++ b/services/nodemanager/doc/ec2.example.cfg
@@ -2,6 +2,11 @@
 # All times are in seconds unless specified otherwise.
 
 [Daemon]
+# The dispatcher can customize the start and stop procedure for
+# cloud nodes.  For example, the SLURM dispatcher drains nodes
+# through SLURM before shutting them down.
+#dispatcher = slurm
+
 # Node Manager will ensure that there are at least this many nodes
 # running at all times.
 min_nodes = 0
diff --git a/services/nodemanager/tests/test_computenode_dispatch.py b/services/nodemanager/tests/test_computenode_dispatch.py
index b8239f3..7f6988d 100644
--- a/services/nodemanager/tests/test_computenode_dispatch.py
+++ b/services/nodemanager/tests/test_computenode_dispatch.py
@@ -93,8 +93,7 @@ class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
                          subscriber.call_args[0][0].actor_ref.actor_urn)
 
 
-class ComputeNodeShutdownActorTestCase(testutil.ActorTestMixin,
-                                       unittest.TestCase):
+class ComputeNodeShutdownActorMixin(testutil.ActorTestMixin):
     def make_mocks(self, cloud_node=None, arvados_node=None,
                    shutdown_open=True):
         self.timer = testutil.MockTimer()
@@ -113,7 +112,7 @@ class ComputeNodeShutdownActorTestCase(testutil.ActorTestMixin,
         monitor_actor = dispatch.ComputeNodeMonitorActor.start(
             self.cloud_node, time.time(), self.shutdowns, self.timer,
             self.updates, self.arvados_node)
-        self.shutdown_actor = dispatch.ComputeNodeShutdownActor.start(
+        self.shutdown_actor = self.ACTOR_CLASS.start(
             self.timer, self.cloud_client, monitor_actor).proxy()
         self.monitor_actor = monitor_actor.proxy()
 
@@ -127,6 +126,11 @@ class ComputeNodeShutdownActorTestCase(testutil.ActorTestMixin,
         else:
             self.fail("success flag {} is not {}".format(last_flag, expected))
 
+
+class ComputeNodeShutdownActorTestCase(ComputeNodeShutdownActorMixin,
+                                       unittest.TestCase):
+    ACTOR_CLASS = dispatch.ComputeNodeShutdownActor
+
     def test_easy_shutdown(self):
         self.make_actor()
         self.check_success_flag(True)
diff --git a/services/nodemanager/tests/test_computenode_dispatch_slurm.py b/services/nodemanager/tests/test_computenode_dispatch_slurm.py
new file mode 100644
index 0000000..ccac8b2
--- /dev/null
+++ b/services/nodemanager/tests/test_computenode_dispatch_slurm.py
@@ -0,0 +1,58 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+import subprocess
+import unittest
+
+import mock
+
+import arvnodeman.computenode.dispatch.slurm as slurm_dispatch
+from . import testutil
+from .test_computenode_dispatch import ComputeNodeShutdownActorMixin
+
+ at mock.patch('subprocess.check_output')
+class SLURMComputeNodeShutdownActorTestCase(ComputeNodeShutdownActorMixin,
+                                            unittest.TestCase):
+    ACTOR_CLASS = slurm_dispatch.ComputeNodeShutdownActor
+
+    def check_slurm_got_args(self, proc_mock, *args):
+        self.assertTrue(proc_mock.called)
+        slurm_cmd = proc_mock.call_args[0][0]
+        for s in args:
+            self.assertIn(s, slurm_cmd)
+
+    def check_success_after_reset(self, proc_mock):
+        self.make_mocks(arvados_node=testutil.arvados_node_mock(63))
+        self.make_actor()
+        self.check_success_flag(None, 0)
+        self.check_success_flag(None, 0)
+        # Order is critical here: if the mock gets called when no return value
+        # or side effect is set, we may invoke a real subprocess.
+        proc_mock.return_value = 'drain\n'
+        proc_mock.side_effect = None
+        self.check_success_flag(True, 3)
+        self.check_slurm_got_args(proc_mock, 'compute63')
+
+    def test_wait_for_drained_state(self, proc_mock):
+        proc_mock.return_value = 'drng\n'
+        self.check_success_after_reset(proc_mock)
+
+    def test_retry_failed_slurm_calls(self, proc_mock):
+        proc_mock.side_effect = subprocess.CalledProcessError(1, ["mock"])
+        self.check_success_after_reset(proc_mock)
+
+    def test_slurm_bypassed_when_no_arvados_node(self, proc_mock):
+        # Test we correctly handle a node that failed to bootstrap.
+        proc_mock.return_value = 'idle\n'
+        self.make_actor()
+        self.check_success_flag(True)
+        self.assertFalse(proc_mock.called)
+
+    def test_node_undrained_when_shutdown_window_closes(self, proc_mock):
+        proc_mock.return_value = 'alloc\n'
+        self.make_mocks(arvados_node=testutil.arvados_node_mock(job_uuid=True))
+        self.make_actor()
+        self.check_success_flag(False, 2)
+        self.check_slurm_got_args(proc_mock, 'NodeName=compute99',
+                                  'State=RESUME')
diff --git a/services/nodemanager/tests/test_config.py b/services/nodemanager/tests/test_config.py
index 3aa9541..d43491e 100644
--- a/services/nodemanager/tests/test_config.py
+++ b/services/nodemanager/tests/test_config.py
@@ -6,6 +6,8 @@ import io
 import logging
 import unittest
 
+import arvnodeman.computenode.dispatch as dispatch
+import arvnodeman.computenode.dispatch.slurm as slurm_dispatch
 import arvnodeman.config as nmconfig
 
 class NodeManagerConfigTestCase(unittest.TestCase):
@@ -63,3 +65,19 @@ testlogger = INFO
         self.assertEqual({'level': logging.DEBUG,
                           'testlogger': logging.INFO},
                          config.log_levels())
+
+    def check_dispatch_classes(self, config, module):
+        setup, shutdown, update, monitor = config.dispatch_classes()
+        self.assertIs(setup, module.ComputeNodeSetupActor)
+        self.assertIs(shutdown, module.ComputeNodeShutdownActor)
+        self.assertIs(update, module.ComputeNodeUpdateActor)
+        self.assertIs(monitor, module.ComputeNodeMonitorActor)
+
+    def test_default_dispatch(self):
+        config = self.load_config()
+        self.check_dispatch_classes(config, dispatch)
+
+    def test_custom_dispatch(self):
+        config = self.load_config(
+            config_str=self.TEST_CONFIG + "[Daemon]\ndispatcher=slurm\n")
+        self.check_dispatch_classes(config, slurm_dispatch)

commit 6874da0dd56d6c0320880dfb3bd4da34a3c0a7d3
Author: Brett Smith <brett at curoverse.com>
Date:   Thu Nov 13 15:44:19 2014 -0500

    4380: Fix retry scheduling for Node Manager change methods.

diff --git a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
index 8d0e6c3..ae0a65b 100644
--- a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
+++ b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
@@ -41,13 +41,14 @@ class ComputeNodeStateChangeBase(config.actor_class):
         def decorator(orig_func):
             @functools.wraps(orig_func)
             def wrapper(self, *args, **kwargs):
+                start_time = time.time()
                 try:
                     orig_func(self, *args, **kwargs)
                 except errors as error:
                     self._logger.warning(
                         "Client error: %s - waiting %s seconds",
                         error, self.retry_wait)
-                    self._timer.schedule(self.retry_wait,
+                    self._timer.schedule(start_time + self.retry_wait,
                                          getattr(self._later,
                                                  orig_func.__name__),
                                          *args, **kwargs)

commit 4af4a2cf12d3b87a4fdbd09115bc258961b706bb
Author: Brett Smith <brett at curoverse.com>
Date:   Thu Nov 13 12:58:14 2014 -0500

    4380: Node Manager shutdown actor is more robust.
    
    ComputeNodeShutdownActor now checks that destroying the cloud node
    succeeds.  Before retrying, it will check if the node is still
    eligible for shutdown, and abort if not.

diff --git a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
index 4dc3dcb..8d0e6c3 100644
--- a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
+++ b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
@@ -6,6 +6,7 @@ import functools
 import logging
 import time
 
+import libcloud.common.types as cloud_types
 import pykka
 
 from .. import arvados_node_fqdn, arvados_node_mtime, timestamp_fresh
@@ -133,19 +134,48 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
 
     This actor simply destroys a cloud node, retrying as needed.
     """
-    def __init__(self, timer_actor, cloud_client, cloud_node,
+    def __init__(self, timer_actor, cloud_client, node_monitor,
                  retry_wait=1, max_retry_wait=180):
         super(ComputeNodeShutdownActor, self).__init__(
             'arvnodeman.nodedown', timer_actor, retry_wait, max_retry_wait)
         self._cloud = cloud_client
-        self.cloud_node = cloud_node
+        self._monitor = node_monitor.proxy()
+        self.cloud_node = self._monitor.cloud_node.get()
+        self.success = None
+
+    def on_start(self):
         self._later.shutdown_node()
 
+    def cancel_shutdown(self):
+        self.success = False
+        self._finished()
+
+    def _stop_if_window_closed(orig_func):
+        @functools.wraps(orig_func)
+        def wrapper(self, *args, **kwargs):
+            if not self._monitor.shutdown_eligible().get():
+                self._logger.info(
+                    "Cloud node %s shutdown cancelled - no longer eligible.",
+                    self.cloud_node.id)
+                self._later.cancel_shutdown()
+                return None
+            else:
+                return orig_func(self, *args, **kwargs)
+        return wrapper
+
+    @_stop_if_window_closed
     @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
     def shutdown_node(self):
-        self._cloud.destroy_node(self.cloud_node)
-        self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
-        self._finished()
+        if self._cloud.destroy_node(self.cloud_node):
+            self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
+            self.success = True
+            self._finished()
+        else:
+            # Force a retry.
+            raise cloud_types.LibcloudError("destroy_node failed")
+
+    # Make the decorator available to subclasses.
+    _stop_if_window_closed = staticmethod(_stop_if_window_closed)
 
 
 class ComputeNodeUpdateActor(config.actor_class):
diff --git a/services/nodemanager/arvnodeman/daemon.py b/services/nodemanager/arvnodeman/daemon.py
index 7ff736f..9f22568 100644
--- a/services/nodemanager/arvnodeman/daemon.py
+++ b/services/nodemanager/arvnodeman/daemon.py
@@ -190,9 +190,8 @@ class NodeManagerDaemonActor(actor_class):
                     break
 
     def _nodes_up(self):
-        up = sum(len(nodelist) for nodelist in
-                 [self.cloud_nodes, self.booted, self.booting])
-        return up - len(self.shutdowns)
+        return sum(len(nodelist) for nodelist in
+                   [self.cloud_nodes, self.booted, self.booting])
 
     def _nodes_busy(self):
         return sum(1 for idle in
@@ -201,12 +200,21 @@ class NodeManagerDaemonActor(actor_class):
                    if idle is False)
 
     def _nodes_wanted(self):
-        return min(len(self.last_wishlist) + self._nodes_busy(),
-                   self.max_nodes) - self._nodes_up()
+        up_count = self._nodes_up()
+        over_max = up_count - self.max_nodes
+        if over_max >= 0:
+            return -over_max
+        else:
+            up_count -= len(self.shutdowns) + self._nodes_busy()
+            return len(self.last_wishlist) - up_count
 
     def _nodes_excess(self):
-        needed_nodes = self._nodes_busy() + len(self.last_wishlist)
-        return (self._nodes_up() - max(self.min_nodes, needed_nodes))
+        up_count = self._nodes_up() - len(self.shutdowns)
+        over_min = up_count - self.min_nodes
+        if over_min <= 0:
+            return over_min
+        else:
+            return up_count - self._nodes_busy() - len(self.last_wishlist)
 
     def update_server_wishlist(self, wishlist):
         self._update_poll_time('server_wishlist')
@@ -257,11 +265,12 @@ class NodeManagerDaemonActor(actor_class):
         if nodes_wanted > 1:
             self._later.start_node()
 
-    def _actor_nodes(self, node_actor):
-        return pykka.get_all([node_actor.cloud_node, node_actor.arvados_node])
+    def _get_actor_attrs(self, actor, *attr_names):
+        return pykka.get_all([getattr(actor, name) for name in attr_names])
 
     def node_up(self, setup_proxy):
-        cloud_node, arvados_node = self._actor_nodes(setup_proxy)
+        cloud_node, arvados_node = self._get_actor_attrs(
+            setup_proxy, 'cloud_node', 'arvados_node')
         del self.booting[setup_proxy.actor_ref.actor_urn]
         setup_proxy.stop()
         record = self.cloud_nodes.get(cloud_node.id)
@@ -287,19 +296,23 @@ class NodeManagerDaemonActor(actor_class):
     def node_can_shutdown(self, node_actor):
         if self._nodes_excess() < 1:
             return None
-        cloud_node, arvados_node = self._actor_nodes(node_actor)
-        if cloud_node.id in self.shutdowns:
+        cloud_node_id = node_actor.cloud_node.get().id
+        if cloud_node_id in self.shutdowns:
             return None
-        shutdown = self._node_shutdown.start(timer_actor=self._timer,
-                                             cloud_client=self._new_cloud(),
-                                             cloud_node=cloud_node).proxy()
-        self.shutdowns[cloud_node.id] = shutdown
+        shutdown = self._node_shutdown.start(
+            timer_actor=self._timer, cloud_client=self._new_cloud(),
+            node_monitor=node_actor.actor_ref).proxy()
+        self.shutdowns[cloud_node_id] = shutdown
         shutdown.subscribe(self._later.node_finished_shutdown)
 
     def node_finished_shutdown(self, shutdown_actor):
-        cloud_node_id = shutdown_actor.cloud_node.get().id
+        success, cloud_node = self._get_actor_attrs(shutdown_actor, 'success',
+                                                    'cloud_node')
         shutdown_actor.stop()
-        if cloud_node_id in self.booted:
+        cloud_node_id = cloud_node.id
+        if not success:
+            del self.shutdowns[cloud_node_id]
+        elif cloud_node_id in self.booted:
             self.booted.pop(cloud_node_id).actor.stop()
             del self.shutdowns[cloud_node_id]
 
diff --git a/services/nodemanager/tests/test_computenode_dispatch.py b/services/nodemanager/tests/test_computenode_dispatch.py
index 3a1c8ba..b8239f3 100644
--- a/services/nodemanager/tests/test_computenode_dispatch.py
+++ b/services/nodemanager/tests/test_computenode_dispatch.py
@@ -95,30 +95,63 @@ class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
 
 class ComputeNodeShutdownActorTestCase(testutil.ActorTestMixin,
                                        unittest.TestCase):
-    def make_mocks(self, cloud_node=None):
+    def make_mocks(self, cloud_node=None, arvados_node=None,
+                   shutdown_open=True):
         self.timer = testutil.MockTimer()
+        self.shutdowns = testutil.MockShutdownTimer()
+        self.shutdowns._set_state(shutdown_open, 300)
         self.cloud_client = mock.MagicMock(name='cloud_client')
+        self.updates = mock.MagicMock(name='update_mock')
         if cloud_node is None:
             cloud_node = testutil.cloud_node_mock()
         self.cloud_node = cloud_node
+        self.arvados_node = arvados_node
 
-    def make_actor(self, arv_node=None):
+    def make_actor(self):
         if not hasattr(self, 'timer'):
             self.make_mocks()
+        monitor_actor = dispatch.ComputeNodeMonitorActor.start(
+            self.cloud_node, time.time(), self.shutdowns, self.timer,
+            self.updates, self.arvados_node)
         self.shutdown_actor = dispatch.ComputeNodeShutdownActor.start(
-            self.timer, self.cloud_client, self.cloud_node).proxy()
+            self.timer, self.cloud_client, monitor_actor).proxy()
+        self.monitor_actor = monitor_actor.proxy()
+
+    def check_success_flag(self, expected, allow_msg_count=1):
+        # allow_msg_count is the number of internal messages that may
+        # need to be handled for shutdown to finish.
+        for try_num in range(1 + allow_msg_count):
+            last_flag = self.shutdown_actor.success.get(self.TIMEOUT)
+            if last_flag is expected:
+                break
+        else:
+            self.fail("success flag {} is not {}".format(last_flag, expected))
 
     def test_easy_shutdown(self):
         self.make_actor()
-        self.shutdown_actor.cloud_node.get(self.TIMEOUT)
-        self.stop_proxy(self.shutdown_actor)
+        self.check_success_flag(True)
         self.assertTrue(self.cloud_client.destroy_node.called)
 
+    def test_shutdown_cancelled_when_window_closes(self):
+        self.make_mocks(shutdown_open=False)
+        self.make_actor()
+        self.check_success_flag(False, 2)
+        self.assertFalse(self.cloud_client.destroy_node.called)
+
+    def test_shutdown_retries_when_cloud_fails(self):
+        self.make_mocks()
+        self.cloud_client.destroy_node.return_value = False
+        self.make_actor()
+        self.assertIsNone(self.shutdown_actor.success.get(self.TIMEOUT))
+        self.cloud_client.destroy_node.return_value = True
+        self.check_success_flag(True)
+
     def test_late_subscribe(self):
         self.make_actor()
         subscriber = mock.Mock(name='subscriber_mock')
         self.shutdown_actor.subscribe(subscriber).get(self.TIMEOUT)
         self.stop_proxy(self.shutdown_actor)
+        self.assertTrue(subscriber.called)
         self.assertEqual(self.shutdown_actor.actor_ref.actor_urn,
                          subscriber.call_args[0][0].actor_ref.actor_urn)
 
@@ -139,14 +172,8 @@ class ComputeNodeUpdateActorTestCase(testutil.ActorTestMixin,
 
 class ComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
                                       unittest.TestCase):
-    class MockShutdownTimer(object):
-        def _set_state(self, is_open, next_opening):
-            self.window_open = lambda: is_open
-            self.next_opening = lambda: next_opening
-
-
     def make_mocks(self, node_num):
-        self.shutdowns = self.MockShutdownTimer()
+        self.shutdowns = testutil.MockShutdownTimer()
         self.shutdowns._set_state(False, 300)
         self.timer = mock.MagicMock(name='timer_mock')
         self.updates = mock.MagicMock(name='update_mock')
diff --git a/services/nodemanager/tests/test_daemon.py b/services/nodemanager/tests/test_daemon.py
index 394fb88..31a682f 100644
--- a/services/nodemanager/tests/test_daemon.py
+++ b/services/nodemanager/tests/test_daemon.py
@@ -260,6 +260,57 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.stop_proxy(self.daemon)
         self.assertFalse(self.node_shutdown.start.called)
 
+    def test_node_shutdown_after_cancelled_shutdown(self):
+        cloud_node = testutil.cloud_node_mock(5)
+        self.make_daemon([cloud_node], [testutil.arvados_node_mock(5)])
+        self.assertEqual(1, self.alive_monitor_count())
+        monitor = self.monitor_list()[0].proxy()
+        shutdown_proxy = self.node_shutdown.start().proxy
+        shutdown_proxy().cloud_node.get.return_value = cloud_node
+        shutdown_proxy().success.get.return_value = False
+        shutdown_proxy.reset_mock()
+        self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
+        self.assertTrue(shutdown_proxy.called)
+        self.daemon.node_finished_shutdown(shutdown_proxy()).get(self.TIMEOUT)
+        shutdown_proxy().success.get.return_value = True
+        shutdown_proxy.reset_mock()
+        self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
+        self.assertTrue(shutdown_proxy.called)
+
+    def test_nodes_shutting_down_replaced_below_max_nodes(self):
+        cloud_node = testutil.cloud_node_mock(6)
+        self.make_daemon([cloud_node], [testutil.arvados_node_mock(6)])
+        self.assertEqual(1, self.alive_monitor_count())
+        monitor = self.monitor_list()[0].proxy()
+        self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
+        self.assertTrue(self.node_shutdown.start.called)
+        self.daemon.update_server_wishlist(
+            [testutil.MockSize(6)]).get(self.TIMEOUT)
+        self.stop_proxy(self.daemon)
+        self.assertTrue(self.node_setup.start.called)
+
+    def test_nodes_shutting_down_not_replaced_at_max_nodes(self):
+        cloud_node = testutil.cloud_node_mock(7)
+        self.make_daemon([cloud_node], [testutil.arvados_node_mock(7)],
+                         max_nodes=1)
+        self.assertEqual(1, self.alive_monitor_count())
+        monitor = self.monitor_list()[0].proxy()
+        self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
+        self.assertTrue(self.node_shutdown.start.called)
+        self.daemon.update_server_wishlist(
+            [testutil.MockSize(7)]).get(self.TIMEOUT)
+        self.stop_proxy(self.daemon)
+        self.assertFalse(self.node_setup.start.called)
+
+    def test_nodes_shutting_down_count_against_excess(self):
+        cloud_nodes = [testutil.cloud_node_mock(n) for n in [8, 9]]
+        arv_nodes = [testutil.arvados_node_mock(n) for n in [8, 9]]
+        self.make_daemon(cloud_nodes, arv_nodes, [testutil.MockSize(8)])
+        self.assertEqual(2, self.alive_monitor_count())
+        for mon_ref in self.monitor_list():
+            self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
+        self.assertEqual(1, self.node_shutdown.start.call_count)
+
     def test_clean_shutdown_waits_for_node_setup_finish(self):
         new_node = self.start_node_boot()
         self.daemon.shutdown().get(self.TIMEOUT)
diff --git a/services/nodemanager/tests/testutil.py b/services/nodemanager/tests/testutil.py
index a1b0658..56f22c8 100644
--- a/services/nodemanager/tests/testutil.py
+++ b/services/nodemanager/tests/testutil.py
@@ -41,6 +41,12 @@ def cloud_node_mock(node_num=99):
 def ip_address_mock(last_octet):
     return '10.20.30.{}'.format(last_octet)
 
+class MockShutdownTimer(object):
+    def _set_state(self, is_open, next_opening):
+        self.window_open = lambda: is_open
+        self.next_opening = lambda: next_opening
+
+
 class MockSize(object):
     def __init__(self, factor):
         self.id = 'z{}.test'.format(factor)

commit 22381d218295075e63c06f0b60bcb24cee7b30b6
Author: Brett Smith <brett at curoverse.com>
Date:   Thu Nov 13 09:47:07 2014 -0500

    4380: Node Manager monitors respond to shutdown_eligible message.

diff --git a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
index d613ef1..4dc3dcb 100644
--- a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
+++ b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
@@ -246,8 +246,10 @@ class ComputeNodeMonitorActor(config.actor_class):
             result = result and not self.arvados_node['job_uuid']
         return result
 
-    def _shutdown_eligible(self):
-        if self.arvados_node is None:
+    def shutdown_eligible(self):
+        if not self._shutdowns.window_open():
+            return False
+        elif self.arvados_node is None:
             # If this is a new, unpaired node, it's eligible for
             # shutdown--we figure there was an error during bootstrap.
             return timestamp_fresh(self.cloud_node_start_time,
@@ -257,17 +259,15 @@ class ComputeNodeMonitorActor(config.actor_class):
 
     def consider_shutdown(self):
         next_opening = self._shutdowns.next_opening()
-        if self._shutdowns.window_open():
-            if self._shutdown_eligible():
-                self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
-                _notify_subscribers(self._later, self.subscribers)
-            else:
-                self._debug("Node %s shutdown window open but node busy.",
-                            self.cloud_node.id)
-        else:
+        if self.shutdown_eligible():
+            self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
+            _notify_subscribers(self._later, self.subscribers)
+        elif self._shutdowns.window_open():
+            self._debug("Node %s shutdown window open but node busy.",
+                        self.cloud_node.id)
+        elif self.last_shutdown_opening != next_opening:
             self._debug("Node %s shutdown window closed.  Next at %s.",
                         self.cloud_node.id, time.ctime(next_opening))
-        if self.last_shutdown_opening != next_opening:
             self._timer.schedule(next_opening, self._later.consider_shutdown)
             self.last_shutdown_opening = next_opening
 
diff --git a/services/nodemanager/tests/test_computenode_dispatch.py b/services/nodemanager/tests/test_computenode_dispatch.py
index ece186b..3a1c8ba 100644
--- a/services/nodemanager/tests/test_computenode_dispatch.py
+++ b/services/nodemanager/tests/test_computenode_dispatch.py
@@ -196,6 +196,16 @@ class ComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
         self.assertTrue(self.timer.schedule.called)
         self.assertEqual(300, self.timer.schedule.call_args[0][0])
 
+    def test_shutdown_window_close_scheduling(self):
+        self.make_actor()
+        self.shutdowns._set_state(False, 600)
+        self.timer.schedule.reset_mock()
+        self.node_actor.consider_shutdown().get(self.TIMEOUT)
+        self.stop_proxy(self.node_actor)
+        self.assertTrue(self.timer.schedule.called)
+        self.assertEqual(600, self.timer.schedule.call_args[0][0])
+        self.assertFalse(self.subscriber.called)
+
     def test_shutdown_subscription(self):
         self.make_actor()
         self.shutdowns._set_state(True, 600)
@@ -207,41 +217,31 @@ class ComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
     def test_shutdown_without_arvados_node(self):
         self.make_actor()
         self.shutdowns._set_state(True, 600)
-        self.node_actor.consider_shutdown().get(self.TIMEOUT)
-        self.assertTrue(self.subscriber.called)
+        self.assertTrue(self.node_actor.shutdown_eligible().get(self.TIMEOUT))
 
     def test_no_shutdown_without_arvados_node_and_old_cloud_node(self):
         self.make_actor(start_time=0)
         self.shutdowns._set_state(True, 600)
-        self.node_actor.consider_shutdown().get(self.TIMEOUT)
-        self.assertFalse(self.subscriber.called)
+        self.assertFalse(self.node_actor.shutdown_eligible().get(self.TIMEOUT))
 
-    def check_shutdown_rescheduled(self, window_open, next_window,
-                                   schedule_time=None):
-        self.shutdowns._set_state(window_open, next_window)
-        self.timer.schedule.reset_mock()
-        self.node_actor.consider_shutdown().get(self.TIMEOUT)
-        self.stop_proxy(self.node_actor)
-        self.assertTrue(self.timer.schedule.called)
-        if schedule_time is not None:
-            self.assertEqual(schedule_time, self.timer.schedule.call_args[0][0])
-        self.assertFalse(self.subscriber.called)
-
-    def test_shutdown_window_close_scheduling(self):
-        self.make_actor()
-        self.check_shutdown_rescheduled(False, 600, 600)
+    def test_no_shutdown_when_window_closed(self):
+        self.make_actor(3, testutil.arvados_node_mock(3, job_uuid=None))
+        self.assertFalse(self.node_actor.shutdown_eligible().get(self.TIMEOUT))
 
     def test_no_shutdown_when_node_running_job(self):
         self.make_actor(4, testutil.arvados_node_mock(4, job_uuid=True))
-        self.check_shutdown_rescheduled(True, 600)
+        self.shutdowns._set_state(True, 600)
+        self.assertFalse(self.node_actor.shutdown_eligible().get(self.TIMEOUT))
 
     def test_no_shutdown_when_node_state_unknown(self):
         self.make_actor(5, testutil.arvados_node_mock(5, info={}))
-        self.check_shutdown_rescheduled(True, 600)
+        self.shutdowns._set_state(True, 600)
+        self.assertFalse(self.node_actor.shutdown_eligible().get(self.TIMEOUT))
 
     def test_no_shutdown_when_node_state_stale(self):
         self.make_actor(6, testutil.arvados_node_mock(6, age=90000))
-        self.check_shutdown_rescheduled(True, 600)
+        self.shutdowns._set_state(True, 600)
+        self.assertFalse(self.node_actor.shutdown_eligible().get(self.TIMEOUT))
 
     def test_arvados_node_match(self):
         self.make_actor(2)

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list