[ARVADOS] updated: 8141501a6ef0a3cf4f40da14671c31c0257472e4

git at public.curoverse.com git at public.curoverse.com
Wed Dec 10 08:05:32 EST 2014


Summary of changes:
 .../arvnodeman/computenode/dispatch/__init__.py    | 11 ++++-
 .../arvnodeman/computenode/dispatch/slurm.py       |  4 +-
 services/nodemanager/arvnodeman/config.py          |  2 +
 services/nodemanager/arvnodeman/daemon.py          | 49 +++++++++++++++-------
 services/nodemanager/arvnodeman/launcher.py        |  1 +
 services/nodemanager/doc/ec2.example.cfg           |  6 +++
 .../nodemanager/tests/test_computenode_dispatch.py | 13 +++++-
 .../tests/test_computenode_dispatch_slurm.py       | 20 ++++++---
 services/nodemanager/tests/test_daemon.py          | 46 ++++++++++++++++++--
 services/nodemanager/tests/testutil.py             | 18 +++++++-
 10 files changed, 139 insertions(+), 31 deletions(-)

       via  8141501a6ef0a3cf4f40da14671c31c0257472e4 (commit)
       via  dd47cf79c71bb4cc5b90f3752d0b79110278e197 (commit)
       via  db32ebf3b3bf067870e5c4a1883a08bc1e77e6b8 (commit)
      from  8453812fac25bae327b8fa52c5a920b1d921e8be (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 8141501a6ef0a3cf4f40da14671c31c0257472e4
Merge: 8453812 dd47cf7
Author: Brett Smith <brett at curoverse.com>
Date:   Wed Dec 10 08:04:08 2014 -0500

    Merge branch '4293-node-manager-timed-bootstrap-wip'
    
    Closes #4293, #4732.  Refs #4380.


commit dd47cf79c71bb4cc5b90f3752d0b79110278e197
Author: Brett Smith <brett at curoverse.com>
Date:   Fri Dec 5 17:27:37 2014 -0500

    4293: Node Manager shuts down nodes that fail to boot.
    
    This helps Node Manager detect and correct when a node fails to
    bootstrap.

diff --git a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
index ae0a65b..c79d8f9 100644
--- a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
+++ b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
@@ -136,12 +136,18 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
     This actor simply destroys a cloud node, retrying as needed.
     """
     def __init__(self, timer_actor, cloud_client, node_monitor,
-                 retry_wait=1, max_retry_wait=180):
+                 cancellable=True, retry_wait=1, max_retry_wait=180):
+        # If a ShutdownActor is cancellable, it will ask the
+        # ComputeNodeMonitorActor if it's still eligible before taking each
+        # action, and stop the shutdown process if the node is no longer
+        # eligible.  Normal shutdowns based on job demand should be
+        # cancellable; shutdowns based on node misbehavior should not.
         super(ComputeNodeShutdownActor, self).__init__(
             'arvnodeman.nodedown', timer_actor, retry_wait, max_retry_wait)
         self._cloud = cloud_client
         self._monitor = node_monitor.proxy()
         self.cloud_node = self._monitor.cloud_node.get()
+        self.cancellable = cancellable
         self.success = None
 
     def on_start(self):
@@ -154,7 +160,8 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
     def _stop_if_window_closed(orig_func):
         @functools.wraps(orig_func)
         def wrapper(self, *args, **kwargs):
-            if not self._monitor.shutdown_eligible().get():
+            if (self.cancellable and
+                  (not self._monitor.shutdown_eligible().get())):
                 self._logger.info(
                     "Cloud node %s shutdown cancelled - no longer eligible.",
                     self.cloud_node.id)
diff --git a/services/nodemanager/arvnodeman/config.py b/services/nodemanager/arvnodeman/config.py
index 079e623..f018015 100644
--- a/services/nodemanager/arvnodeman/config.py
+++ b/services/nodemanager/arvnodeman/config.py
@@ -6,6 +6,7 @@ import ConfigParser
 import importlib
 import logging
 import ssl
+import sys
 
 import arvados
 import httplib2
@@ -42,6 +43,7 @@ class NodeManagerConfig(ConfigParser.SafeConfigParser):
                        'poll_time': '60',
                        'max_poll_time': '300',
                        'poll_stale_after': '600',
+                       'boot_fail_after': str(sys.maxint),
                        'node_stale_after': str(60 * 60 * 2)},
             'Logging': {'file': '/dev/stderr',
                         'level': 'WARNING'},
diff --git a/services/nodemanager/arvnodeman/daemon.py b/services/nodemanager/arvnodeman/daemon.py
index 9f22568..d03d145 100644
--- a/services/nodemanager/arvnodeman/daemon.py
+++ b/services/nodemanager/arvnodeman/daemon.py
@@ -26,14 +26,16 @@ class _BaseNodeTracker(object):
         self.nodes = {}
         self.orphans = {}
 
-    def __getitem__(self, key):
-        return self.nodes[key]
-
-    def __len__(self):
-        return len(self.nodes)
+    # Proxy the methods listed below to self.nodes.
+    def _proxy_method(name):
+        method = getattr(dict, name)
+        @functools.wraps(method, ('__name__', '__doc__'))
+        def wrapper(self, *args, **kwargs):
+            return method(self.nodes, *args, **kwargs)
+        return wrapper
 
-    def get(self, key, default=None):
-        return self.nodes.get(key, default)
+    for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
+        locals()[_method_name] = _proxy_method(_method_name)
 
     def record_key(self, record):
         return self.item_key(getattr(record, self.RECORD_ATTR))
@@ -96,7 +98,9 @@ class NodeManagerDaemonActor(actor_class):
                  cloud_nodes_actor, cloud_update_actor, timer_actor,
                  arvados_factory, cloud_factory,
                  shutdown_windows, min_nodes, max_nodes,
-                 poll_stale_after=600, node_stale_after=7200,
+                 poll_stale_after=600,
+                 boot_fail_after=1800,
+                 node_stale_after=7200,
                  node_setup_class=dispatch.ComputeNodeSetupActor,
                  node_shutdown_class=dispatch.ComputeNodeShutdownActor,
                  node_actor_class=dispatch.ComputeNodeMonitorActor):
@@ -115,6 +119,7 @@ class NodeManagerDaemonActor(actor_class):
         self.min_nodes = min_nodes
         self.max_nodes = max_nodes
         self.poll_stale_after = poll_stale_after
+        self.boot_fail_after = boot_fail_after
         self.node_stale_after = node_stale_after
         self.last_polls = {}
         for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
@@ -269,15 +274,15 @@ class NodeManagerDaemonActor(actor_class):
         return pykka.get_all([getattr(actor, name) for name in attr_names])
 
     def node_up(self, setup_proxy):
-        cloud_node, arvados_node = self._get_actor_attrs(
-            setup_proxy, 'cloud_node', 'arvados_node')
+        cloud_node = setup_proxy.cloud_node.get()
         del self.booting[setup_proxy.actor_ref.actor_urn]
         setup_proxy.stop()
         record = self.cloud_nodes.get(cloud_node.id)
         if record is None:
             record = self._new_node(cloud_node)
             self.booted[cloud_node.id] = record
-        self._pair_nodes(record, arvados_node)
+        self._timer.schedule(time.time() + self.boot_fail_after,
+                             self._later.shutdown_unpaired_node, cloud_node.id)
 
     @_check_poll_freshness
     def stop_booting_node(self):
@@ -292,19 +297,31 @@ class NodeManagerDaemonActor(actor_class):
                     self._later.stop_booting_node()
                 break
 
-    @_check_poll_freshness
-    def node_can_shutdown(self, node_actor):
-        if self._nodes_excess() < 1:
-            return None
+    def _begin_node_shutdown(self, node_actor, cancellable):
         cloud_node_id = node_actor.cloud_node.get().id
         if cloud_node_id in self.shutdowns:
             return None
         shutdown = self._node_shutdown.start(
             timer_actor=self._timer, cloud_client=self._new_cloud(),
-            node_monitor=node_actor.actor_ref).proxy()
+            node_monitor=node_actor.actor_ref, cancellable=cancellable).proxy()
         self.shutdowns[cloud_node_id] = shutdown
         shutdown.subscribe(self._later.node_finished_shutdown)
 
+    @_check_poll_freshness
+    def node_can_shutdown(self, node_actor):
+        if self._nodes_excess() > 0:
+            self._begin_node_shutdown(node_actor, cancellable=True)
+
+    def shutdown_unpaired_node(self, cloud_node_id):
+        for record_dict in [self.cloud_nodes, self.booted]:
+            if cloud_node_id in record_dict:
+                record = record_dict[cloud_node_id]
+                break
+        else:
+            return None
+        if record.arvados_node is None:
+            self._begin_node_shutdown(record.actor, cancellable=False)
+
     def node_finished_shutdown(self, shutdown_actor):
         success, cloud_node = self._get_actor_attrs(shutdown_actor, 'success',
                                                     'cloud_node')
diff --git a/services/nodemanager/arvnodeman/launcher.py b/services/nodemanager/arvnodeman/launcher.py
index 9f5e162..5fa404f 100644
--- a/services/nodemanager/arvnodeman/launcher.py
+++ b/services/nodemanager/arvnodeman/launcher.py
@@ -119,6 +119,7 @@ def main(args=None):
         config.getint('Daemon', 'min_nodes'),
         config.getint('Daemon', 'max_nodes'),
         config.getint('Daemon', 'poll_stale_after'),
+        config.getint('Daemon', 'boot_fail_after'),
         config.getint('Daemon', 'node_stale_after'),
         node_setup, node_shutdown, node_monitor).proxy()
 
diff --git a/services/nodemanager/doc/ec2.example.cfg b/services/nodemanager/doc/ec2.example.cfg
index 0f9caca..024ed2b 100644
--- a/services/nodemanager/doc/ec2.example.cfg
+++ b/services/nodemanager/doc/ec2.example.cfg
@@ -27,6 +27,12 @@ max_poll_time = 300
 # information is too outdated.
 poll_stale_after = 600
 
+# If Node Manager boots a cloud node, and it does not pair with an Arvados
+# node before this long, assume that there was a cloud bootstrap failure and
+# shut it down.  Note that normal shutdown windows apply (see the Cloud
+# section), so this should be shorter than the first shutdown window value.
+boot_fail_after = 1800
+
 # "Node stale time" affects two related behaviors.
 # 1. If a compute node has been running for at least this long, but it
 # isn't paired with an Arvados node, do not shut it down, but leave it alone.
diff --git a/services/nodemanager/tests/test_computenode_dispatch.py b/services/nodemanager/tests/test_computenode_dispatch.py
index 7f6988d..c86dcfd 100644
--- a/services/nodemanager/tests/test_computenode_dispatch.py
+++ b/services/nodemanager/tests/test_computenode_dispatch.py
@@ -106,14 +106,14 @@ class ComputeNodeShutdownActorMixin(testutil.ActorTestMixin):
         self.cloud_node = cloud_node
         self.arvados_node = arvados_node
 
-    def make_actor(self):
+    def make_actor(self, cancellable=True):
         if not hasattr(self, 'timer'):
             self.make_mocks()
         monitor_actor = dispatch.ComputeNodeMonitorActor.start(
             self.cloud_node, time.time(), self.shutdowns, self.timer,
             self.updates, self.arvados_node)
         self.shutdown_actor = self.ACTOR_CLASS.start(
-            self.timer, self.cloud_client, monitor_actor).proxy()
+            self.timer, self.cloud_client, monitor_actor, cancellable).proxy()
         self.monitor_actor = monitor_actor.proxy()
 
     def check_success_flag(self, expected, allow_msg_count=1):
@@ -126,6 +126,15 @@ class ComputeNodeShutdownActorMixin(testutil.ActorTestMixin):
         else:
             self.fail("success flag {} is not {}".format(last_flag, expected))
 
+    def test_uncancellable_shutdown(self, *mocks):
+        self.make_mocks(shutdown_open=False)
+        self.cloud_client.destroy_node.return_value = False
+        self.make_actor(cancellable=False)
+        self.check_success_flag(None, 0)
+        self.shutdowns._set_state(True, 600)
+        self.cloud_client.destroy_node.return_value = True
+        self.check_success_flag(True)
+
 
 class ComputeNodeShutdownActorTestCase(ComputeNodeShutdownActorMixin,
                                        unittest.TestCase):
diff --git a/services/nodemanager/tests/test_daemon.py b/services/nodemanager/tests/test_daemon.py
index 31a682f..0f146a1 100644
--- a/services/nodemanager/tests/test_daemon.py
+++ b/services/nodemanager/tests/test_daemon.py
@@ -22,14 +22,14 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.cloud_factory = mock.MagicMock(name='cloud_mock')
         self.cloud_factory().node_start_time.return_value = time.time()
         self.cloud_updates = mock.MagicMock(name='updates_mock')
-        self.timer = testutil.MockTimer()
+        self.timer = testutil.MockTimer(deliver_immediately=False)
         self.node_setup = mock.MagicMock(name='setup_mock')
         self.node_shutdown = mock.MagicMock(name='shutdown_mock')
         self.daemon = nmdaemon.NodeManagerDaemonActor.start(
             self.server_wishlist_poller, self.arvados_nodes_poller,
             self.cloud_nodes_poller, self.cloud_updates, self.timer,
             self.arv_factory, self.cloud_factory,
-            [54, 5, 1], min_nodes, max_nodes, 600, 3600,
+            [54, 5, 1], min_nodes, max_nodes, 600, 1800, 3600,
             self.node_setup, self.node_shutdown).proxy()
         if cloud_nodes is not None:
             self.daemon.update_cloud_nodes(cloud_nodes).get(self.TIMEOUT)
@@ -44,6 +44,12 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
     def alive_monitor_count(self):
         return sum(1 for actor in self.monitor_list() if actor.is_alive())
 
+    def assertShutdownCancellable(self, expected=True):
+        self.assertTrue(self.node_shutdown.start.called)
+        self.assertIs(expected,
+                      self.node_shutdown.start.call_args[1]['cancellable'],
+                      "ComputeNodeShutdownActor incorrectly cancellable")
+
     def test_easy_node_creation(self):
         size = testutil.MockSize(1)
         self.make_daemon(want_sizes=[size])
@@ -195,8 +201,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         monitor = self.monitor_list()[0].proxy()
         self.daemon.update_server_wishlist([])
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
-        self.assertTrue(self.node_shutdown.start.called,
-                        "daemon did not shut down booted node on offer")
+        self.assertShutdownCancellable(True)
         shutdown = self.node_shutdown.start().proxy()
         shutdown.cloud_node.get.return_value = cloud_node
         self.daemon.node_finished_shutdown(shutdown).get(self.TIMEOUT)
@@ -210,6 +215,37 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.assertTrue(self.node_setup.start.called,
                         "second node not started after booted node stopped")
 
+    def test_booted_node_shut_down_when_never_listed(self):
+        setup = self.start_node_boot()
+        self.daemon.node_up(setup).get(self.TIMEOUT)
+        self.assertEqual(1, self.alive_monitor_count())
+        self.assertFalse(self.node_shutdown.start.called)
+        self.timer.deliver()
+        self.stop_proxy(self.daemon)
+        self.assertShutdownCancellable(False)
+
+    def test_booted_node_shut_down_when_never_paired(self):
+        cloud_node = testutil.cloud_node_mock(2)
+        setup = self.start_node_boot(cloud_node)
+        self.daemon.node_up(setup).get(self.TIMEOUT)
+        self.assertEqual(1, self.alive_monitor_count())
+        self.daemon.update_cloud_nodes([cloud_node])
+        self.timer.deliver()
+        self.stop_proxy(self.daemon)
+        self.assertShutdownCancellable(False)
+
+    def test_node_that_pairs_not_considered_failed_boot(self):
+        cloud_node = testutil.cloud_node_mock(3)
+        arv_node = testutil.arvados_node_mock(3)
+        setup = self.start_node_boot(cloud_node, arv_node)
+        self.daemon.node_up(setup).get(self.TIMEOUT)
+        self.assertEqual(1, self.alive_monitor_count())
+        self.daemon.update_cloud_nodes([cloud_node])
+        self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
+        self.timer.deliver()
+        self.stop_proxy(self.daemon)
+        self.assertFalse(self.node_shutdown.start.called)
+
     def test_booting_nodes_shut_down(self):
         self.make_daemon(want_sizes=[testutil.MockSize(1)])
         self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
@@ -317,6 +353,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.assertTrue(new_node.stop_if_no_cloud_node.called)
         self.daemon.node_up(new_node).get(self.TIMEOUT)
         self.assertTrue(new_node.stop.called)
+        self.timer.deliver()
         self.assertTrue(
             self.daemon.actor_ref.actor_stopped.wait(self.TIMEOUT))
 
@@ -325,5 +362,6 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.make_daemon(want_sizes=[size])
         self.daemon.shutdown().get(self.TIMEOUT)
         self.daemon.update_server_wishlist([size] * 2).get(self.TIMEOUT)
+        self.timer.deliver()
         self.stop_proxy(self.daemon)
         self.assertEqual(1, self.node_setup.start.call_count)
diff --git a/services/nodemanager/tests/testutil.py b/services/nodemanager/tests/testutil.py
index 56f22c8..30808ac 100644
--- a/services/nodemanager/tests/testutil.py
+++ b/services/nodemanager/tests/testutil.py
@@ -2,6 +2,7 @@
 
 from __future__ import absolute_import, print_function
 
+import threading
 import time
 
 import mock
@@ -62,8 +63,23 @@ class MockSize(object):
 
 
 class MockTimer(object):
+    def __init__(self, deliver_immediately=True):
+        self.deliver_immediately = deliver_immediately
+        self.messages = []
+        self.lock = threading.Lock()
+
+    def deliver(self):
+        with self.lock:
+            to_deliver = self.messages
+            self.messages = []
+        for callback, args, kwargs in to_deliver:
+            callback(*args, **kwargs)
+
     def schedule(self, want_time, callback, *args, **kwargs):
-        return callback(*args, **kwargs)
+        with self.lock:
+            self.messages.append((callback, args, kwargs))
+        if self.deliver_immediately:
+            self.deliver()
 
 
 class ActorTestMixin(object):

commit db32ebf3b3bf067870e5c4a1883a08bc1e77e6b8
Author: Brett Smith <brett at curoverse.com>
Date:   Fri Dec 5 17:45:13 2014 -0500

    4380: Node Manager SLURM dispatcher proceeds from more states.
    
    Per discussion with Ward.  Our main concern is that Node Manager
    shouldn't shut down nodes that are doing work.  We feel comfortable
    broadening the definition of "not doing work" to this set of states.

diff --git a/services/nodemanager/arvnodeman/computenode/dispatch/slurm.py b/services/nodemanager/arvnodeman/computenode/dispatch/slurm.py
index 27397e5..6eaa8b9 100644
--- a/services/nodemanager/arvnodeman/computenode/dispatch/slurm.py
+++ b/services/nodemanager/arvnodeman/computenode/dispatch/slurm.py
@@ -10,6 +10,8 @@ from . import \
 from . import ComputeNodeShutdownActor as ShutdownActorBase
 
 class ComputeNodeShutdownActor(ShutdownActorBase):
+    SLURM_END_STATES = frozenset(['down\n', 'down*\n', 'drain\n', 'fail\n'])
+
     def on_start(self):
         arv_node = self._monitor.arvados_node.get()
         if arv_node is None:
@@ -42,7 +44,7 @@ class ComputeNodeShutdownActor(ShutdownActorBase):
     def await_slurm_drain(self):
         output = subprocess.check_output(
             ['sinfo', '--noheader', '-o', '%t', '-n', self._nodename])
-        if output == 'drain\n':
+        if output in self.SLURM_END_STATES:
             self._later.shutdown_node()
         else:
             self._timer.schedule(time.time() + 10,
diff --git a/services/nodemanager/tests/test_computenode_dispatch_slurm.py b/services/nodemanager/tests/test_computenode_dispatch_slurm.py
index ccac8b2..93cc60d 100644
--- a/services/nodemanager/tests/test_computenode_dispatch_slurm.py
+++ b/services/nodemanager/tests/test_computenode_dispatch_slurm.py
@@ -22,21 +22,31 @@ class SLURMComputeNodeShutdownActorTestCase(ComputeNodeShutdownActorMixin,
         for s in args:
             self.assertIn(s, slurm_cmd)
 
-    def check_success_after_reset(self, proc_mock):
+    def check_success_after_reset(self, proc_mock, end_state='drain\n'):
         self.make_mocks(arvados_node=testutil.arvados_node_mock(63))
         self.make_actor()
         self.check_success_flag(None, 0)
         self.check_success_flag(None, 0)
         # Order is critical here: if the mock gets called when no return value
         # or side effect is set, we may invoke a real subprocess.
-        proc_mock.return_value = 'drain\n'
+        proc_mock.return_value = end_state
         proc_mock.side_effect = None
         self.check_success_flag(True, 3)
         self.check_slurm_got_args(proc_mock, 'compute63')
 
-    def test_wait_for_drained_state(self, proc_mock):
-        proc_mock.return_value = 'drng\n'
-        self.check_success_after_reset(proc_mock)
+    def make_wait_state_test(start_state='drng\n', end_state='drain\n'):
+        def test(self, proc_mock):
+            proc_mock.return_value = start_state
+            self.check_success_after_reset(proc_mock, end_state)
+        return test
+
+    for wait_state in ['alloc\n', 'drng\n', 'idle*\n']:
+        locals()['test_wait_while_' + wait_state.strip()
+                 ] = make_wait_state_test(start_state=wait_state)
+
+    for end_state in ['down\n', 'down*\n', 'drain\n', 'fail\n']:
+        locals()['test_wait_until_' + end_state.strip()
+                 ] = make_wait_state_test(end_state=end_state)
 
     def test_retry_failed_slurm_calls(self, proc_mock):
         proc_mock.side_effect = subprocess.CalledProcessError(1, ["mock"])

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list