[ARVADOS] created: 26751323e77005dc158b64e86c47bbb9459e6697

Git user git at public.curoverse.com
Thu Apr 14 16:02:21 EDT 2016


        at  26751323e77005dc158b64e86c47bbb9459e6697 (commit)


commit 26751323e77005dc158b64e86c47bbb9459e6697
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Thu Apr 14 16:02:16 2016 -0400

    8953: Node manager shutdown policy change WIP.  Still fixing tests.

diff --git a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
index 552ed01..a2d24b3 100644
--- a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
+++ b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
@@ -14,6 +14,7 @@ from .. import \
     arvados_node_missing, RetryMixin
 from ...clientactor import _notify_subscribers
 from ... import config
+from .transitions import transitions
 
 class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
     """Base class for actors that change a compute node's state.
@@ -208,17 +209,6 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
         self._logger.info("Shutdown cancelled: %s.", reason)
         self._finished(success_flag=False)
 
-    def _stop_if_window_closed(orig_func):
-        @functools.wraps(orig_func)
-        def stop_wrapper(self, *args, **kwargs):
-            if (self.cancellable and
-                  (self._monitor.shutdown_eligible().get() is not True)):
-                self._later.cancel_shutdown(self.WINDOW_CLOSED)
-                return None
-            else:
-                return orig_func(self, *args, **kwargs)
-        return stop_wrapper
-
     def _cancel_on_exception(orig_func):
         @functools.wraps(orig_func)
         def finish_wrapper(self, *args, **kwargs):
@@ -230,7 +220,6 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
         return finish_wrapper
 
     @_cancel_on_exception
-    @_stop_if_window_closed
     @RetryMixin._retry()
     def shutdown_node(self):
         self._logger.info("Starting shutdown")
@@ -254,9 +243,6 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
         self._clean_arvados_node(arvados_node, "Shut down by Node Manager")
         self._finished(success_flag=True)
 
-    # Make the decorator available to subclasses.
-    _stop_if_window_closed = staticmethod(_stop_if_window_closed)
-
 
 class ComputeNodeUpdateActor(config.actor_class):
     """Actor to dispatch one-off cloud management requests.
@@ -369,51 +355,49 @@ class ComputeNodeMonitorActor(config.actor_class):
             result = result and not self.arvados_node['job_uuid']
         return result
 
-    def shutdown_eligible(self):
-        """Return True if eligible for shutdown, or a string explaining why the node
-        is not eligible for shutdown."""
+    def consider_shutdown(self):
+        try:
+            # Collect states and then consult state transition table
+            # whether we should shut down.  Possible states are:
+            #
+            # crunch_worker_state = ['unpaired', 'busy', 'idle', 'down']
+            # window = ["open", "closed"]
+            # boot_grace = ["boot wait", "boot exceeded"]
+            # idle_grace = ["not idle", "idle wait", "idle exceeded"]
+
+            if self.arvados_node is None:
+                crunch_worker_state = 'unpaired'
+            elif self.arvados_node['crunch_worker_state']:
+                crunch_worker_state = self.arvados_node['crunch_worker_state']
+            else:
+                self._debug("Node is paired but crunch_worker_state is null")
+                return
+
+            window = "open" if self._shutdowns.window_open() else "closed"
 
-        if not self._shutdowns.window_open():
-            return "shutdown window is not open."
-        if self.arvados_node is None:
-            # Node is unpaired.
-            # If it hasn't pinged Arvados after boot_fail seconds, shut it down
             if timestamp_fresh(self.cloud_node_start_time, self.boot_fail_after):
-                return "node is still booting, will be considered a failed boot at %s" % time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(self.cloud_node_start_time + self.boot_fail_after))
+                boot_grace = "boot wait"
             else:
-                return True
-        missing = arvados_node_missing(self.arvados_node, self.node_stale_after)
-        if missing and self._cloud.broken(self.cloud_node):
-            # Node is paired, but Arvados says it is missing and the cloud says the node
-            # is in an error state, so shut it down.
-            return True
-        if missing is None and self._cloud.broken(self.cloud_node):
-            self._logger.info(
-                "Cloud node considered 'broken' but paired node %s last_ping_at is None, " +
-                "cannot check node_stale_after (node may be shut down and we just haven't gotten the message yet).",
-                self.arvados_node['uuid'])
-        if self.in_state('idle'):
-            return True
-        else:
-            return "node is not idle."
+                boot_grace = "boot exceeded"
 
-    def resume_node(self):
-        pass
+            # API server side not implemented yet.
+            idle_grace = 'idle exceeded'
+
+            node_state = (crunch_worker_state, window, boot_grace, idle_grace)
+            self._debug("Considering shutdown, node state is %s", node_state)
+            eligible = transitions[node_state]
 
-    def consider_shutdown(self):
-        try:
             next_opening = self._shutdowns.next_opening()
-            eligible = self.shutdown_eligible()
-            if eligible is True:
+            if eligible:
                 self._debug("Suggesting shutdown.")
                 _notify_subscribers(self.actor_ref.proxy(), self.subscribers)
-            elif self._shutdowns.window_open():
-                self._debug("Cannot shut down because %s", eligible)
             elif self.last_shutdown_opening != next_opening:
                 self._debug("Shutdown window closed.  Next at %s.",
                             time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(next_opening)))
                 self._timer.schedule(next_opening, self._later.consider_shutdown)
                 self.last_shutdown_opening = next_opening
+            else:
+              self._debug("Won't shut down")
         except Exception:
             self._logger.exception("Unexpected exception")
 
diff --git a/services/nodemanager/arvnodeman/computenode/dispatch/slurm.py b/services/nodemanager/arvnodeman/computenode/dispatch/slurm.py
index 41919db..058afda 100644
--- a/services/nodemanager/arvnodeman/computenode/dispatch/slurm.py
+++ b/services/nodemanager/arvnodeman/computenode/dispatch/slurm.py
@@ -6,9 +6,8 @@ import subprocess
 import time
 
 from . import \
-    ComputeNodeSetupActor, ComputeNodeUpdateActor
+    ComputeNodeSetupActor, ComputeNodeUpdateActor, ComputeNodeMonitorActor
 from . import ComputeNodeShutdownActor as ShutdownActorBase
-from . import ComputeNodeMonitorActor as MonitorActorBase
 from .. import RetryMixin
 
 class SlurmMixin(object):
@@ -52,43 +51,20 @@ class ComputeNodeShutdownActor(SlurmMixin, ShutdownActorBase):
         return super(ComputeNodeShutdownActor, self).cancel_shutdown(reason)
 
     @RetryMixin._retry((subprocess.CalledProcessError,))
-    @ShutdownActorBase._stop_if_window_closed
     def issue_slurm_drain(self):
-        self._set_node_state(self._nodename, 'DRAIN', 'Reason=Node Manager shutdown')
-        self._logger.info("Waiting for SLURM node %s to drain", self._nodename)
-        self._later.await_slurm_drain()
+        if self.cancel_reason is None:
+            self._set_node_state(self._nodename, 'DRAIN', 'Reason=Node Manager shutdown')
+            self._logger.info("Waiting for SLURM node %s to drain", self._nodename)
+            self._later.await_slurm_drain()
 
     @RetryMixin._retry((subprocess.CalledProcessError,))
-    @ShutdownActorBase._stop_if_window_closed
     def await_slurm_drain(self):
         output = self._get_slurm_state(self._nodename)
         if output in self.SLURM_END_STATES:
             self._later.shutdown_node()
+        elif output in ("alloc\n", "idle\n"):
+            # Not in "drng" so cancel self.
+            self.cancel_shutdown("slurm state is idle")
         else:
             self._timer.schedule(time.time() + 10,
                                  self._later.await_slurm_drain)
-
-
-class ComputeNodeMonitorActor(SlurmMixin, MonitorActorBase):
-
-    def shutdown_eligible(self):
-        if self.arvados_node is not None:
-            state = self._get_slurm_state(self.arvados_node['hostname'])
-            # Automatically eligible for shutdown if it's down or failed, but
-            # not drain to avoid a race condition with resume_node().
-            if state in self.SLURM_END_STATES:
-                if state in self.SLURM_DRAIN_STATES:
-                    return "node is draining"
-                else:
-                    return True
-        return super(ComputeNodeMonitorActor, self).shutdown_eligible()
-
-    def resume_node(self):
-        try:
-            if (self.arvados_node is not None and
-                self._get_slurm_state(self.arvados_node['hostname']) in self.SLURM_DRAIN_STATES):
-                # Resume from "drng" or "drain"
-                self._set_node_state(self.arvados_node['hostname'], 'RESUME')
-        except Exception as error:
-            self._logger.warn(
-                "Exception reenabling node: %s", error, exc_info=error)
diff --git a/services/nodemanager/tests/test_computenode_dispatch.py b/services/nodemanager/tests/test_computenode_dispatch.py
index 14cd85e..b722f3c 100644
--- a/services/nodemanager/tests/test_computenode_dispatch.py
+++ b/services/nodemanager/tests/test_computenode_dispatch.py
@@ -206,6 +206,7 @@ class ComputeNodeShutdownActorMixin(testutil.ActorTestMixin):
         arv_node = testutil.arvados_node_mock(61)
         self.make_mocks(cloud_node, arv_node, shutdown_open=False)
         self.make_actor(cancellable=True)
+        self.shutdown_actor.cancel_shutdown("test")
         self.check_success_flag(False, 2)
         self.assertFalse(self.arvados_client.nodes().update.called)
 
diff --git a/services/nodemanager/tests/test_computenode_dispatch_slurm.py b/services/nodemanager/tests/test_computenode_dispatch_slurm.py
index 135b817..a6d6643 100644
--- a/services/nodemanager/tests/test_computenode_dispatch_slurm.py
+++ b/services/nodemanager/tests/test_computenode_dispatch_slurm.py
@@ -60,19 +60,19 @@ class SLURMComputeNodeShutdownActorTestCase(ComputeNodeShutdownActorMixin,
         self.check_success_flag(True)
         self.assertFalse(proc_mock.called)
 
-    def test_node_undrained_when_shutdown_window_closes(self, proc_mock):
-        proc_mock.side_effect = iter(['drng\n', 'idle\n'])
-        self.make_mocks(arvados_node=testutil.arvados_node_mock(job_uuid=True))
-        self.make_actor()
-        self.check_success_flag(False, 2)
-        self.check_slurm_got_args(proc_mock, 'NodeName=compute99', 'State=RESUME')
-
-    def test_alloc_node_undrained_when_shutdown_window_closes(self, proc_mock):
-        proc_mock.side_effect = iter(['alloc\n'])
-        self.make_mocks(arvados_node=testutil.arvados_node_mock(job_uuid=True))
-        self.make_actor()
-        self.check_success_flag(False, 2)
-        self.check_slurm_got_args(proc_mock, 'sinfo', '--noheader', '-o', '%t', '-n', 'compute99')
+    # def test_node_undrained_when_shutdown_window_closes(self, proc_mock):
+    #     proc_mock.side_effect = iter(['drng\n', 'idle\n'])
+    #     self.make_mocks(arvados_node=testutil.arvados_node_mock(job_uuid=True))
+    #     self.make_actor()
+    #     self.check_success_flag(False, 2)
+    #     self.check_slurm_got_args(proc_mock, 'NodeName=compute99', 'State=RESUME')
+
+    # def test_alloc_node_undrained_when_shutdown_window_closes(self, proc_mock):
+    #     proc_mock.side_effect = iter(['alloc\n'])
+    #     self.make_mocks(arvados_node=testutil.arvados_node_mock(job_uuid=True))
+    #     self.make_actor()
+    #     self.check_success_flag(False, 2)
+    #     self.check_slurm_got_args(proc_mock, 'sinfo', '--noheader', '-o', '%t', '-n', 'compute99')
 
     def test_cancel_shutdown_retry(self, proc_mock):
         proc_mock.side_effect = iter([OSError, 'drain\n', OSError, 'idle\n'])
@@ -114,33 +114,6 @@ class SLURMComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
         self.node_actor.subscribe(self.subscriber).get(self.TIMEOUT)
 
     @mock.patch("subprocess.check_output")
-    def test_resume_node(self, check_output):
-        arv_node = testutil.arvados_node_mock()
-        self.make_actor(arv_node=arv_node)
-        check_output.return_value = "drain\n"
-        self.node_actor.resume_node().get(self.TIMEOUT)
-        self.assertIn(mock.call(['sinfo', '--noheader', '-o', '%t', '-n', arv_node['hostname']]), check_output.call_args_list)
-        self.assertIn(mock.call(['scontrol', 'update', 'NodeName=' + arv_node['hostname'], 'State=RESUME']), check_output.call_args_list)
-
-    @mock.patch("subprocess.check_output")
-    def test_no_resume_idle_node(self, check_output):
-        arv_node = testutil.arvados_node_mock()
-        self.make_actor(arv_node=arv_node)
-        check_output.return_value = "idle\n"
-        self.node_actor.resume_node().get(self.TIMEOUT)
-        self.assertIn(mock.call(['sinfo', '--noheader', '-o', '%t', '-n', arv_node['hostname']]), check_output.call_args_list)
-        self.assertNotIn(mock.call(['scontrol', 'update', 'NodeName=' + arv_node['hostname'], 'State=RESUME']), check_output.call_args_list)
-
-    @mock.patch("subprocess.check_output")
-    def test_resume_node_exception(self, check_output):
-        arv_node = testutil.arvados_node_mock()
-        self.make_actor(arv_node=arv_node)
-        check_output.side_effect = Exception()
-        self.node_actor.resume_node().get(self.TIMEOUT)
-        self.assertIn(mock.call(['sinfo', '--noheader', '-o', '%t', '-n', arv_node['hostname']]), check_output.call_args_list)
-        self.assertNotIn(mock.call(['scontrol', 'update', 'NodeName=' + arv_node['hostname'], 'State=RESUME']), check_output.call_args_list)
-
-    @mock.patch("subprocess.check_output")
     def test_shutdown_down_node(self, check_output):
         check_output.return_value = "down\n"
         self.make_actor(arv_node=testutil.arvados_node_mock())

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list