[ARVADOS] created: 0934e5663b3e3dc0567ecfc71423d970a313578a
Git user
git at public.curoverse.com
Tue May 17 11:15:22 EDT 2016
at 0934e5663b3e3dc0567ecfc71423d970a313578a (commit)
commit 0934e5663b3e3dc0567ecfc71423d970a313578a
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Tue May 17 11:15:16 2016 -0400
8236: Add watchdog actor. This calls ping() on every other actor to check that
it is responsive. If an actor fails to respond, kill node manager.
diff --git a/services/nodemanager/arvnodeman/baseactor.py b/services/nodemanager/arvnodeman/baseactor.py
index 9591b42..840ba4c 100644
--- a/services/nodemanager/arvnodeman/baseactor.py
+++ b/services/nodemanager/arvnodeman/baseactor.py
@@ -3,6 +3,8 @@ from __future__ import absolute_import, print_function
import errno
import logging
import os
+import signal
+import time
import threading
import traceback
@@ -82,4 +84,39 @@ class BaseNodeManagerActor(pykka.ThreadingActor):
if (exception_type in (threading.ThreadError, MemoryError) or
exception_type is OSError and exception_value.errno == errno.ENOMEM):
lg.critical("Unhandled exception is a fatal error, killing Node Manager")
- os.killpg(os.getpgid(0), 9)
+ os.kill(os.getpid(), signal.SIGQUIT)
+
+ def ping(self):
+ return True
+
+
+class WatchdogActor(pykka.ThreadingActor):
+ def __init__(self, timeout, *args, **kwargs):
+ super(pykka.ThreadingActor, self).__init__(*args, **kwargs)
+ self.timeout = timeout
+ self.actor_ref = TellableActorRef(self)
+ self._later = self.actor_ref.tell_proxy()
+
+ def kill_self(self, act):
+ lg = getattr(self, "_logger", logging)
+ lg.critical("Actor %s watchdog ping time out, killing Node Manager", act)
+ os.kill(os.getpid(), signal.SIGQUIT)
+
+ def on_start(self):
+ self._later.run()
+
+ def run(self):
+ actors = pykka.ActorRegistry.get_all()
+ for a in actors:
+ if a.actor_class is WatchdogActor:
+ continue
+ try:
+ a.proxy().ping().get(self.timeout)
+ except pykka.ActorDeadError:
+ pass
+ except pykka.Timeout:
+ self.kill_self(a)
+ return
+
+ time.sleep(20)
+ self._later.run()
diff --git a/services/nodemanager/arvnodeman/config.py b/services/nodemanager/arvnodeman/config.py
index 15891a9..30c82e7 100644
--- a/services/nodemanager/arvnodeman/config.py
+++ b/services/nodemanager/arvnodeman/config.py
@@ -44,7 +44,8 @@ class NodeManagerConfig(ConfigParser.SafeConfigParser):
'poll_stale_after': '600',
'max_total_price': '0',
'boot_fail_after': str(sys.maxint),
- 'node_stale_after': str(60 * 60 * 2)},
+ 'node_stale_after': str(60 * 60 * 2),
+ 'watchdog': 600},
'Logging': {'file': '/dev/stderr',
'level': 'WARNING'},
}.iteritems():
diff --git a/services/nodemanager/arvnodeman/launcher.py b/services/nodemanager/arvnodeman/launcher.py
index 78bd2db..4554c4c 100644
--- a/services/nodemanager/arvnodeman/launcher.py
+++ b/services/nodemanager/arvnodeman/launcher.py
@@ -12,6 +12,7 @@ import daemon
import pykka
from . import config as nmconfig
+from .baseactor import WatchdogActor
from .daemon import NodeManagerDaemonActor
from .jobqueue import JobQueueMonitorActor, ServerCalculator
from .nodelist import ArvadosNodeListMonitorActor, CloudNodeListMonitorActor
@@ -125,6 +126,8 @@ def main(args=None):
node_setup, node_shutdown, node_monitor,
max_total_price=config.getfloat('Daemon', 'max_total_price')).tell_proxy()
+ WatchdogActor.start(config.getint('Daemon', 'watchdog'))
+
signal.pause()
daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
while not daemon_stopped():
diff --git a/services/nodemanager/tests/test_failure.py b/services/nodemanager/tests/test_failure.py
index 35605fc..f30ea15 100644
--- a/services/nodemanager/tests/test_failure.py
+++ b/services/nodemanager/tests/test_failure.py
@@ -4,6 +4,7 @@ from __future__ import absolute_import, print_function
import errno
import logging
+import time
import threading
import unittest
@@ -22,18 +23,30 @@ class BogusActor(arvnodeman.baseactor.BaseNodeManagerActor):
def doStuff(self):
raise self.exp
+ def ping(self):
+ time.sleep(2)
+
class ActorUnhandledExceptionTest(unittest.TestCase):
def test_fatal_error(self):
for e in (MemoryError(), threading.ThreadError(), OSError(errno.ENOMEM, "")):
- with mock.patch('os.killpg') as killpg_mock:
+ with mock.patch('os.kill') as kill_mock:
act = BogusActor.start(e).tell_proxy()
act.doStuff()
act.actor_ref.stop(block=True)
- self.assertTrue(killpg_mock.called)
-
- def test_nonfatal_error(self):
- with mock.patch('os.killpg') as killpg_mock:
- act = BogusActor.start(OSError(errno.ENOENT, "")).tell_proxy()
- act.doStuff()
- act.actor_ref.stop(block=True)
- self.assertFalse(killpg_mock.called)
+ self.assertTrue(kill_mock.called)
+
+ @mock.patch('os.kill')
+ def test_nonfatal_error(self, kill_mock):
+ act = BogusActor.start(OSError(errno.ENOENT, "")).tell_proxy()
+ act.doStuff()
+ act.actor_ref.stop(block=True)
+ self.assertFalse(kill_mock.called)
+
+class WatchdogActorTest(unittest.TestCase):
+ @mock.patch('os.kill')
+ def test_time_timout(self, kill_mock):
+ act = BogusActor.start(OSError(errno.ENOENT, ""))
+ watch = arvnodeman.baseactor.WatchdogActor.start(1)
+ watch.stop(block=True)
+ act.stop(block=True)
+ self.assertTrue(kill_mock.called)
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list