[ARVADOS] created: 054f95044461c08fd5fb6cd983d1e8ea1dc62ea8
Git user
git at public.curoverse.com
Tue Mar 8 11:30:04 EST 2016
at 054f95044461c08fd5fb6cd983d1e8ea1dc62ea8 (commit)
commit 054f95044461c08fd5fb6cd983d1e8ea1dc62ea8
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Thu Mar 3 13:54:30 2016 -0500
8543: Implement TellActorProxy which uses Actor.tell() instead of Actor.ask().
Implement BaseNodeMangerActor and TellableActorRef to provide tell_proxy().
Convert proxies in Node Manager that don't return Futures to use
TellActorProxy.
diff --git a/services/nodemanager/arvnodeman/baseactor.py b/services/nodemanager/arvnodeman/baseactor.py
new file mode 100644
index 0000000..9591b42
--- /dev/null
+++ b/services/nodemanager/arvnodeman/baseactor.py
@@ -0,0 +1,85 @@
+from __future__ import absolute_import, print_function
+
+import errno
+import logging
+import os
+import threading
+import traceback
+
+import pykka
+
+class _TellCallableProxy(object):
+ """Internal helper class for proxying callables."""
+
+ def __init__(self, ref, attr_path):
+ self.actor_ref = ref
+ self._attr_path = attr_path
+
+ def __call__(self, *args, **kwargs):
+ message = {
+ 'command': 'pykka_call',
+ 'attr_path': self._attr_path,
+ 'args': args,
+ 'kwargs': kwargs,
+ }
+ self.actor_ref.tell(message)
+
+
+class TellActorProxy(pykka.ActorProxy):
+ """ActorProxy in which all calls are implemented as using tell().
+
+ The standard pykka.ActorProxy always uses ask() and returns a Future. If
+ the target method raises an exception, it is placed in the Future object
+ and re-raised when get() is called on the Future. Unfortunately, most
+ messaging in Node Manager is asynchronous and the caller does not store the
+ Future object returned by the call to ActorProxy. As a result, exceptions
+ resulting from these calls end up in limbo, neither reported in the logs
+ nor handled by on_failure().
+
+ The TellActorProxy uses tell() instead of ask() and does not return a
+ Future object. As a result, if the target method raises an exception, it
+ will be logged and on_failure() will be called as intended.
+
+ """
+
+ def __repr__(self):
+ return '<ActorProxy for %s, attr_path=%s>' % (
+ self.actor_ref, self._attr_path)
+
+ def __getattr__(self, name):
+ """Get a callable from the actor."""
+ attr_path = self._attr_path + (name,)
+ if attr_path not in self._known_attrs:
+ self._known_attrs = self._get_attributes()
+ attr_info = self._known_attrs.get(attr_path)
+ if attr_info is None:
+ raise AttributeError('%s has no attribute "%s"' % (self, name))
+ if attr_info['callable']:
+ if attr_path not in self._callable_proxies:
+ self._callable_proxies[attr_path] = _TellCallableProxy(
+ self.actor_ref, attr_path)
+ return self._callable_proxies[attr_path]
+ else:
+ raise AttributeError('attribute "%s" is not a callable on %s' % (name, self))
+
+class TellableActorRef(pykka.ActorRef):
+ """ActorRef adding the tell_proxy() method to get TellActorProxy."""
+
+ def tell_proxy(self):
+ return TellActorProxy(self)
+
+class BaseNodeManagerActor(pykka.ThreadingActor):
+ """Base class for actors in node manager, redefining actor_ref as a
+ TellableActorRef and providing a default on_failure handler.
+ """
+
+ def __init__(self, *args, **kwargs):
+ super(pykka.ThreadingActor, self).__init__(*args, **kwargs)
+ self.actor_ref = TellableActorRef(self)
+
+ def on_failure(self, exception_type, exception_value, tb):
+ lg = getattr(self, "_logger", logging)
+ if (exception_type in (threading.ThreadError, MemoryError) or
+ exception_type is OSError and exception_value.errno == errno.ENOMEM):
+ lg.critical("Unhandled exception is a fatal error, killing Node Manager")
+ os.killpg(os.getpgid(0), 9)
diff --git a/services/nodemanager/arvnodeman/clientactor.py b/services/nodemanager/arvnodeman/clientactor.py
index 9a9ce58..e130749 100644
--- a/services/nodemanager/arvnodeman/clientactor.py
+++ b/services/nodemanager/arvnodeman/clientactor.py
@@ -38,7 +38,7 @@ class RemotePollLoopActor(actor_class):
super(RemotePollLoopActor, self).__init__()
self._client = client
self._timer = timer_actor
- self._later = self.actor_ref.proxy()
+ self._later = self.actor_ref.tell_proxy()
self._polling_started = False
self.min_poll_wait = poll_wait
self.max_poll_wait = max_poll_wait
diff --git a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
index 2ae4fc8..e11dcc7 100644
--- a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
+++ b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
@@ -26,7 +26,7 @@ class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
super(ComputeNodeStateChangeBase, self).__init__()
RetryMixin.__init__(self, retry_wait, max_retry_wait,
None, cloud_client, timer_actor)
- self._later = self.actor_ref.proxy()
+ self._later = self.actor_ref.tell_proxy()
self._arvados = arvados_client
self.subscribers = set()
@@ -37,6 +37,8 @@ class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
self._set_logger()
def _finished(self):
+ if self.subscribers is None:
+ raise Exception("Actor tried to finish twice")
_notify_subscribers(self._later, self.subscribers)
self.subscribers = None
self._logger.info("finished")
@@ -225,6 +227,7 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
if not self._cloud.destroy_node(self.cloud_node):
if self._cloud.broken(self.cloud_node):
self._later.cancel_shutdown(self.NODE_BROKEN)
+ return
else:
# Force a retry.
raise cloud_types.LibcloudError("destroy_node failed")
@@ -304,7 +307,7 @@ class ComputeNodeMonitorActor(config.actor_class):
boot_fail_after=1800
):
super(ComputeNodeMonitorActor, self).__init__()
- self._later = self.actor_ref.proxy()
+ self._later = self.actor_ref.tell_proxy()
self._last_log = None
self._shutdowns = shutdown_timer
self._cloud_node_fqdn = cloud_fqdn_func
diff --git a/services/nodemanager/arvnodeman/config.py b/services/nodemanager/arvnodeman/config.py
index dcfe1ce..15891a9 100644
--- a/services/nodemanager/arvnodeman/config.py
+++ b/services/nodemanager/arvnodeman/config.py
@@ -12,7 +12,7 @@ import httplib2
import pykka
from apiclient import errors as apierror
-from .fullstopactor import FullStopActor
+from .baseactor import BaseNodeManagerActor
# IOError is the base class for socket.error, ssl.SSLError, and friends.
# It seems like it hits the sweet spot for operations we want to retry:
@@ -20,7 +20,7 @@ from .fullstopactor import FullStopActor
NETWORK_ERRORS = (IOError,)
ARVADOS_ERRORS = NETWORK_ERRORS + (apierror.Error,)
-actor_class = FullStopActor
+actor_class = BaseNodeManagerActor
class NodeManagerConfig(ConfigParser.SafeConfigParser):
"""Node Manager Configuration class.
diff --git a/services/nodemanager/arvnodeman/daemon.py b/services/nodemanager/arvnodeman/daemon.py
index 0993c47..33b6cd5 100644
--- a/services/nodemanager/arvnodeman/daemon.py
+++ b/services/nodemanager/arvnodeman/daemon.py
@@ -121,7 +121,7 @@ class NodeManagerDaemonActor(actor_class):
self._new_arvados = arvados_factory
self._new_cloud = cloud_factory
self._cloud_driver = self._new_cloud()
- self._later = self.actor_ref.proxy()
+ self._later = self.actor_ref.tell_proxy()
self.shutdown_windows = shutdown_windows
self.server_calculator = server_calculator
self.min_cloud_size = self.server_calculator.cheapest_size()
@@ -174,11 +174,12 @@ class NodeManagerDaemonActor(actor_class):
poll_stale_after=self.poll_stale_after,
node_stale_after=self.node_stale_after,
cloud_client=self._cloud_driver,
- boot_fail_after=self.boot_fail_after).proxy()
- actor.subscribe(self._later.node_can_shutdown)
+ boot_fail_after=self.boot_fail_after)
+ actorTell = actor.tell_proxy()
+ actorTell.subscribe(self._later.node_can_shutdown)
self._cloud_nodes_actor.subscribe_to(cloud_node.id,
- actor.update_cloud_node)
- record = _ComputeNodeRecord(actor, cloud_node)
+ actorTell.update_cloud_node)
+ record = _ComputeNodeRecord(actor.proxy(), cloud_node)
return record
def update_cloud_nodes(self, nodelist):
@@ -360,7 +361,7 @@ class NodeManagerDaemonActor(actor_class):
arvados_client=self._new_arvados(),
arvados_node=arvados_node,
cloud_client=self._new_cloud(),
- cloud_size=cloud_size).proxy()
+ cloud_size=cloud_size).tell_proxy()
self.booting[new_setup.actor_ref.actor_urn] = new_setup
self.sizes_booting_shutdown[new_setup.actor_ref.actor_urn] = cloud_size
@@ -413,7 +414,7 @@ class NodeManagerDaemonActor(actor_class):
node_monitor=node_actor.actor_ref, cancellable=cancellable).proxy()
self.shutdowns[cloud_node_id] = shutdown
self.sizes_booting_shutdown[cloud_node_id] = cloud_node_obj.size
- shutdown.subscribe(self._later.node_finished_shutdown)
+ shutdown.tell_proxy().subscribe(self._later.node_finished_shutdown)
@_check_poll_freshness
def node_can_shutdown(self, node_actor):
@@ -438,12 +439,10 @@ class NodeManagerDaemonActor(actor_class):
if not success:
if cancel_reason == self._node_shutdown.NODE_BROKEN:
self.cloud_nodes.blacklist(cloud_node_id)
- del self.shutdowns[cloud_node_id]
- del self.sizes_booting_shutdown[cloud_node_id]
elif cloud_node_id in self.booted:
self.booted.pop(cloud_node_id).actor.stop()
- del self.shutdowns[cloud_node_id]
- del self.sizes_booting_shutdown[cloud_node_id]
+ del self.shutdowns[cloud_node_id]
+ del self.sizes_booting_shutdown[cloud_node_id]
def shutdown(self):
self._logger.info("Shutting down after signal.")
diff --git a/services/nodemanager/arvnodeman/fullstopactor.py b/services/nodemanager/arvnodeman/fullstopactor.py
deleted file mode 100644
index 07e0625..0000000
--- a/services/nodemanager/arvnodeman/fullstopactor.py
+++ /dev/null
@@ -1,17 +0,0 @@
-from __future__ import absolute_import, print_function
-
-import errno
-import logging
-import os
-import threading
-import traceback
-
-import pykka
-
-class FullStopActor(pykka.ThreadingActor):
- def on_failure(self, exception_type, exception_value, tb):
- lg = getattr(self, "_logger", logging)
- if (exception_type in (threading.ThreadError, MemoryError) or
- exception_type is OSError and exception_value.errno == errno.ENOMEM):
- lg.critical("Unhandled exception is a fatal error, killing Node Manager")
- os.killpg(os.getpgid(0), 9)
diff --git a/services/nodemanager/arvnodeman/launcher.py b/services/nodemanager/arvnodeman/launcher.py
index c8b3d19..78bd2db 100644
--- a/services/nodemanager/arvnodeman/launcher.py
+++ b/services/nodemanager/arvnodeman/launcher.py
@@ -69,14 +69,14 @@ def launch_pollers(config, server_calculator):
poll_time = config.getint('Daemon', 'poll_time')
max_poll_time = config.getint('Daemon', 'max_poll_time')
- timer = TimedCallBackActor.start(poll_time / 10.0).proxy()
+ timer = TimedCallBackActor.start(poll_time / 10.0).tell_proxy()
cloud_node_poller = CloudNodeListMonitorActor.start(
- config.new_cloud_client(), timer, poll_time, max_poll_time).proxy()
+ config.new_cloud_client(), timer, poll_time, max_poll_time).tell_proxy()
arvados_node_poller = ArvadosNodeListMonitorActor.start(
- config.new_arvados_client(), timer, poll_time, max_poll_time).proxy()
+ config.new_arvados_client(), timer, poll_time, max_poll_time).tell_proxy()
job_queue_poller = JobQueueMonitorActor.start(
config.new_arvados_client(), timer, server_calculator,
- poll_time, max_poll_time).proxy()
+ poll_time, max_poll_time).tell_proxy()
return timer, cloud_node_poller, arvados_node_poller, job_queue_poller
_caught_signals = {}
@@ -110,7 +110,7 @@ def main(args=None):
server_calculator = build_server_calculator(config)
timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
launch_pollers(config, server_calculator)
- cloud_node_updater = node_update.start(config.new_cloud_client).proxy()
+ cloud_node_updater = node_update.start(config.new_cloud_client).tell_proxy()
node_daemon = NodeManagerDaemonActor.start(
job_queue_poller, arvados_node_poller, cloud_node_poller,
cloud_node_updater, timer,
@@ -123,7 +123,7 @@ def main(args=None):
config.getint('Daemon', 'boot_fail_after'),
config.getint('Daemon', 'node_stale_after'),
node_setup, node_shutdown, node_monitor,
- max_total_price=config.getfloat('Daemon', 'max_total_price')).proxy()
+ max_total_price=config.getfloat('Daemon', 'max_total_price')).tell_proxy()
signal.pause()
daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
diff --git a/services/nodemanager/arvnodeman/timedcallback.py b/services/nodemanager/arvnodeman/timedcallback.py
index 615f798..12d6280 100644
--- a/services/nodemanager/arvnodeman/timedcallback.py
+++ b/services/nodemanager/arvnodeman/timedcallback.py
@@ -18,7 +18,7 @@ class TimedCallBackActor(actor_class):
"""
def __init__(self, max_sleep=1):
super(TimedCallBackActor, self).__init__()
- self._proxy = self.actor_ref.proxy()
+ self._proxy = self.actor_ref.tell_proxy()
self.messages = []
self.max_sleep = max_sleep
diff --git a/services/nodemanager/tests/test_daemon.py b/services/nodemanager/tests/test_daemon.py
index f41fa6c..554fb88 100644
--- a/services/nodemanager/tests/test_daemon.py
+++ b/services/nodemanager/tests/test_daemon.py
@@ -26,6 +26,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
cloud_size=get_cloud_size,
actor_ref=mock_actor)
mock_actor.proxy.return_value = mock_proxy
+ mock_actor.tell_proxy.return_value = mock_proxy
self.last_setup = mock_proxy
return mock_actor
diff --git a/services/nodemanager/tests/test_failure.py b/services/nodemanager/tests/test_failure.py
index afebb9c..35605fc 100644
--- a/services/nodemanager/tests/test_failure.py
+++ b/services/nodemanager/tests/test_failure.py
@@ -12,9 +12,9 @@ import pykka
from . import testutil
-import arvnodeman.fullstopactor
+import arvnodeman.baseactor
-class BogusActor(arvnodeman.fullstopactor.FullStopActor):
+class BogusActor(arvnodeman.baseactor.BaseNodeManagerActor):
def __init__(self, e):
super(BogusActor, self).__init__()
self.exp = e
@@ -23,26 +23,17 @@ class BogusActor(arvnodeman.fullstopactor.FullStopActor):
raise self.exp
class ActorUnhandledExceptionTest(unittest.TestCase):
- def test1(self):
+ def test_fatal_error(self):
for e in (MemoryError(), threading.ThreadError(), OSError(errno.ENOMEM, "")):
with mock.patch('os.killpg') as killpg_mock:
- act = BogusActor.start(e)
- act.tell({
- 'command': 'pykka_call',
- 'attr_path': ("doStuff",),
- 'args': [],
- 'kwargs': {}
- })
- act.stop(block=True)
+ act = BogusActor.start(e).tell_proxy()
+ act.doStuff()
+ act.actor_ref.stop(block=True)
self.assertTrue(killpg_mock.called)
+ def test_nonfatal_error(self):
with mock.patch('os.killpg') as killpg_mock:
- act = BogusActor.start(OSError(errno.ENOENT, ""))
- act.tell({
- 'command': 'pykka_call',
- 'attr_path': ("doStuff",),
- 'args': [],
- 'kwargs': {}
- })
- act.stop(block=True)
+ act = BogusActor.start(OSError(errno.ENOENT, "")).tell_proxy()
+ act.doStuff()
+ act.actor_ref.stop(block=True)
self.assertFalse(killpg_mock.called)
diff --git a/services/nodemanager/tests/testutil.py b/services/nodemanager/tests/testutil.py
index 6cde766..5803b05 100644
--- a/services/nodemanager/tests/testutil.py
+++ b/services/nodemanager/tests/testutil.py
@@ -85,7 +85,10 @@ class MockTimer(object):
to_deliver = self.messages
self.messages = []
for callback, args, kwargs in to_deliver:
- callback(*args, **kwargs)
+ try:
+ callback(*args, **kwargs)
+ except pykka.ActorDeadError:
+ pass
def schedule(self, want_time, callback, *args, **kwargs):
with self.lock:
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list