[ARVADOS] created: 054f95044461c08fd5fb6cd983d1e8ea1dc62ea8

Git user git at public.curoverse.com
Tue Mar 8 11:30:04 EST 2016


        at  054f95044461c08fd5fb6cd983d1e8ea1dc62ea8 (commit)


commit 054f95044461c08fd5fb6cd983d1e8ea1dc62ea8
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Thu Mar 3 13:54:30 2016 -0500

    8543: Implement TellActorProxy which uses Actor.tell() instead of Actor.ask().
    Implement BaseNodeMangerActor and TellableActorRef to provide tell_proxy().
    Convert proxies in Node Manager that don't return Futures to use
    TellActorProxy.

diff --git a/services/nodemanager/arvnodeman/baseactor.py b/services/nodemanager/arvnodeman/baseactor.py
new file mode 100644
index 0000000..9591b42
--- /dev/null
+++ b/services/nodemanager/arvnodeman/baseactor.py
@@ -0,0 +1,85 @@
+from __future__ import absolute_import, print_function
+
+import errno
+import logging
+import os
+import threading
+import traceback
+
+import pykka
+
+class _TellCallableProxy(object):
+    """Internal helper class for proxying callables."""
+
+    def __init__(self, ref, attr_path):
+        self.actor_ref = ref
+        self._attr_path = attr_path
+
+    def __call__(self, *args, **kwargs):
+        message = {
+            'command': 'pykka_call',
+            'attr_path': self._attr_path,
+            'args': args,
+            'kwargs': kwargs,
+        }
+        self.actor_ref.tell(message)
+
+
+class TellActorProxy(pykka.ActorProxy):
+    """ActorProxy in which all calls are implemented as using tell().
+
+    The standard pykka.ActorProxy always uses ask() and returns a Future.  If
+    the target method raises an exception, it is placed in the Future object
+    and re-raised when get() is called on the Future.  Unfortunately, most
+    messaging in Node Manager is asynchronous and the caller does not store the
+    Future object returned by the call to ActorProxy.  As a result, exceptions
+    resulting from these calls end up in limbo, neither reported in the logs
+    nor handled by on_failure().
+
+    The TellActorProxy uses tell() instead of ask() and does not return a
+    Future object.  As a result, if the target method raises an exception, it
+    will be logged and on_failure() will be called as intended.
+
+    """
+
+    def __repr__(self):
+        return '<ActorProxy for %s, attr_path=%s>' % (
+            self.actor_ref, self._attr_path)
+
+    def __getattr__(self, name):
+        """Get a callable from the actor."""
+        attr_path = self._attr_path + (name,)
+        if attr_path not in self._known_attrs:
+            self._known_attrs = self._get_attributes()
+        attr_info = self._known_attrs.get(attr_path)
+        if attr_info is None:
+            raise AttributeError('%s has no attribute "%s"' % (self, name))
+        if attr_info['callable']:
+            if attr_path not in self._callable_proxies:
+                self._callable_proxies[attr_path] = _TellCallableProxy(
+                    self.actor_ref, attr_path)
+            return self._callable_proxies[attr_path]
+        else:
+            raise AttributeError('attribute "%s" is not a callable on %s' % (name, self))
+
+class TellableActorRef(pykka.ActorRef):
+    """ActorRef adding the tell_proxy() method to get TellActorProxy."""
+
+    def tell_proxy(self):
+        return TellActorProxy(self)
+
+class BaseNodeManagerActor(pykka.ThreadingActor):
+    """Base class for actors in node manager, redefining actor_ref as a
+    TellableActorRef and providing a default on_failure handler.
+    """
+
+    def __init__(self, *args, **kwargs):
+         super(pykka.ThreadingActor, self).__init__(*args, **kwargs)
+         self.actor_ref = TellableActorRef(self)
+
+    def on_failure(self, exception_type, exception_value, tb):
+        lg = getattr(self, "_logger", logging)
+        if (exception_type in (threading.ThreadError, MemoryError) or
+            exception_type is OSError and exception_value.errno == errno.ENOMEM):
+            lg.critical("Unhandled exception is a fatal error, killing Node Manager")
+            os.killpg(os.getpgid(0), 9)
diff --git a/services/nodemanager/arvnodeman/clientactor.py b/services/nodemanager/arvnodeman/clientactor.py
index 9a9ce58..e130749 100644
--- a/services/nodemanager/arvnodeman/clientactor.py
+++ b/services/nodemanager/arvnodeman/clientactor.py
@@ -38,7 +38,7 @@ class RemotePollLoopActor(actor_class):
         super(RemotePollLoopActor, self).__init__()
         self._client = client
         self._timer = timer_actor
-        self._later = self.actor_ref.proxy()
+        self._later = self.actor_ref.tell_proxy()
         self._polling_started = False
         self.min_poll_wait = poll_wait
         self.max_poll_wait = max_poll_wait
diff --git a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
index 2ae4fc8..e11dcc7 100644
--- a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
+++ b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
@@ -26,7 +26,7 @@ class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
         super(ComputeNodeStateChangeBase, self).__init__()
         RetryMixin.__init__(self, retry_wait, max_retry_wait,
                             None, cloud_client, timer_actor)
-        self._later = self.actor_ref.proxy()
+        self._later = self.actor_ref.tell_proxy()
         self._arvados = arvados_client
         self.subscribers = set()
 
@@ -37,6 +37,8 @@ class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
         self._set_logger()
 
     def _finished(self):
+        if self.subscribers is None:
+            raise Exception("Actor tried to finish twice")
         _notify_subscribers(self._later, self.subscribers)
         self.subscribers = None
         self._logger.info("finished")
@@ -225,6 +227,7 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
         if not self._cloud.destroy_node(self.cloud_node):
             if self._cloud.broken(self.cloud_node):
                 self._later.cancel_shutdown(self.NODE_BROKEN)
+                return
             else:
                 # Force a retry.
                 raise cloud_types.LibcloudError("destroy_node failed")
@@ -304,7 +307,7 @@ class ComputeNodeMonitorActor(config.actor_class):
                  boot_fail_after=1800
     ):
         super(ComputeNodeMonitorActor, self).__init__()
-        self._later = self.actor_ref.proxy()
+        self._later = self.actor_ref.tell_proxy()
         self._last_log = None
         self._shutdowns = shutdown_timer
         self._cloud_node_fqdn = cloud_fqdn_func
diff --git a/services/nodemanager/arvnodeman/config.py b/services/nodemanager/arvnodeman/config.py
index dcfe1ce..15891a9 100644
--- a/services/nodemanager/arvnodeman/config.py
+++ b/services/nodemanager/arvnodeman/config.py
@@ -12,7 +12,7 @@ import httplib2
 import pykka
 from apiclient import errors as apierror
 
-from .fullstopactor import FullStopActor
+from .baseactor import BaseNodeManagerActor
 
 # IOError is the base class for socket.error, ssl.SSLError, and friends.
 # It seems like it hits the sweet spot for operations we want to retry:
@@ -20,7 +20,7 @@ from .fullstopactor import FullStopActor
 NETWORK_ERRORS = (IOError,)
 ARVADOS_ERRORS = NETWORK_ERRORS + (apierror.Error,)
 
-actor_class = FullStopActor
+actor_class = BaseNodeManagerActor
 
 class NodeManagerConfig(ConfigParser.SafeConfigParser):
     """Node Manager Configuration class.
diff --git a/services/nodemanager/arvnodeman/daemon.py b/services/nodemanager/arvnodeman/daemon.py
index 0993c47..33b6cd5 100644
--- a/services/nodemanager/arvnodeman/daemon.py
+++ b/services/nodemanager/arvnodeman/daemon.py
@@ -121,7 +121,7 @@ class NodeManagerDaemonActor(actor_class):
         self._new_arvados = arvados_factory
         self._new_cloud = cloud_factory
         self._cloud_driver = self._new_cloud()
-        self._later = self.actor_ref.proxy()
+        self._later = self.actor_ref.tell_proxy()
         self.shutdown_windows = shutdown_windows
         self.server_calculator = server_calculator
         self.min_cloud_size = self.server_calculator.cheapest_size()
@@ -174,11 +174,12 @@ class NodeManagerDaemonActor(actor_class):
             poll_stale_after=self.poll_stale_after,
             node_stale_after=self.node_stale_after,
             cloud_client=self._cloud_driver,
-            boot_fail_after=self.boot_fail_after).proxy()
-        actor.subscribe(self._later.node_can_shutdown)
+            boot_fail_after=self.boot_fail_after)
+        actorTell = actor.tell_proxy()
+        actorTell.subscribe(self._later.node_can_shutdown)
         self._cloud_nodes_actor.subscribe_to(cloud_node.id,
-                                             actor.update_cloud_node)
-        record = _ComputeNodeRecord(actor, cloud_node)
+                                             actorTell.update_cloud_node)
+        record = _ComputeNodeRecord(actor.proxy(), cloud_node)
         return record
 
     def update_cloud_nodes(self, nodelist):
@@ -360,7 +361,7 @@ class NodeManagerDaemonActor(actor_class):
             arvados_client=self._new_arvados(),
             arvados_node=arvados_node,
             cloud_client=self._new_cloud(),
-            cloud_size=cloud_size).proxy()
+            cloud_size=cloud_size).tell_proxy()
         self.booting[new_setup.actor_ref.actor_urn] = new_setup
         self.sizes_booting_shutdown[new_setup.actor_ref.actor_urn] = cloud_size
 
@@ -413,7 +414,7 @@ class NodeManagerDaemonActor(actor_class):
             node_monitor=node_actor.actor_ref, cancellable=cancellable).proxy()
         self.shutdowns[cloud_node_id] = shutdown
         self.sizes_booting_shutdown[cloud_node_id] = cloud_node_obj.size
-        shutdown.subscribe(self._later.node_finished_shutdown)
+        shutdown.tell_proxy().subscribe(self._later.node_finished_shutdown)
 
     @_check_poll_freshness
     def node_can_shutdown(self, node_actor):
@@ -438,12 +439,10 @@ class NodeManagerDaemonActor(actor_class):
         if not success:
             if cancel_reason == self._node_shutdown.NODE_BROKEN:
                 self.cloud_nodes.blacklist(cloud_node_id)
-            del self.shutdowns[cloud_node_id]
-            del self.sizes_booting_shutdown[cloud_node_id]
         elif cloud_node_id in self.booted:
             self.booted.pop(cloud_node_id).actor.stop()
-            del self.shutdowns[cloud_node_id]
-            del self.sizes_booting_shutdown[cloud_node_id]
+        del self.shutdowns[cloud_node_id]
+        del self.sizes_booting_shutdown[cloud_node_id]
 
     def shutdown(self):
         self._logger.info("Shutting down after signal.")
diff --git a/services/nodemanager/arvnodeman/fullstopactor.py b/services/nodemanager/arvnodeman/fullstopactor.py
deleted file mode 100644
index 07e0625..0000000
--- a/services/nodemanager/arvnodeman/fullstopactor.py
+++ /dev/null
@@ -1,17 +0,0 @@
-from __future__ import absolute_import, print_function
-
-import errno
-import logging
-import os
-import threading
-import traceback
-
-import pykka
-
-class FullStopActor(pykka.ThreadingActor):
-    def on_failure(self, exception_type, exception_value, tb):
-        lg = getattr(self, "_logger", logging)
-        if (exception_type in (threading.ThreadError, MemoryError) or
-            exception_type is OSError and exception_value.errno == errno.ENOMEM):
-            lg.critical("Unhandled exception is a fatal error, killing Node Manager")
-            os.killpg(os.getpgid(0), 9)
diff --git a/services/nodemanager/arvnodeman/launcher.py b/services/nodemanager/arvnodeman/launcher.py
index c8b3d19..78bd2db 100644
--- a/services/nodemanager/arvnodeman/launcher.py
+++ b/services/nodemanager/arvnodeman/launcher.py
@@ -69,14 +69,14 @@ def launch_pollers(config, server_calculator):
     poll_time = config.getint('Daemon', 'poll_time')
     max_poll_time = config.getint('Daemon', 'max_poll_time')
 
-    timer = TimedCallBackActor.start(poll_time / 10.0).proxy()
+    timer = TimedCallBackActor.start(poll_time / 10.0).tell_proxy()
     cloud_node_poller = CloudNodeListMonitorActor.start(
-        config.new_cloud_client(), timer, poll_time, max_poll_time).proxy()
+        config.new_cloud_client(), timer, poll_time, max_poll_time).tell_proxy()
     arvados_node_poller = ArvadosNodeListMonitorActor.start(
-        config.new_arvados_client(), timer, poll_time, max_poll_time).proxy()
+        config.new_arvados_client(), timer, poll_time, max_poll_time).tell_proxy()
     job_queue_poller = JobQueueMonitorActor.start(
         config.new_arvados_client(), timer, server_calculator,
-        poll_time, max_poll_time).proxy()
+        poll_time, max_poll_time).tell_proxy()
     return timer, cloud_node_poller, arvados_node_poller, job_queue_poller
 
 _caught_signals = {}
@@ -110,7 +110,7 @@ def main(args=None):
         server_calculator = build_server_calculator(config)
         timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
             launch_pollers(config, server_calculator)
-        cloud_node_updater = node_update.start(config.new_cloud_client).proxy()
+        cloud_node_updater = node_update.start(config.new_cloud_client).tell_proxy()
         node_daemon = NodeManagerDaemonActor.start(
             job_queue_poller, arvados_node_poller, cloud_node_poller,
             cloud_node_updater, timer,
@@ -123,7 +123,7 @@ def main(args=None):
             config.getint('Daemon', 'boot_fail_after'),
             config.getint('Daemon', 'node_stale_after'),
             node_setup, node_shutdown, node_monitor,
-            max_total_price=config.getfloat('Daemon', 'max_total_price')).proxy()
+            max_total_price=config.getfloat('Daemon', 'max_total_price')).tell_proxy()
 
         signal.pause()
         daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
diff --git a/services/nodemanager/arvnodeman/timedcallback.py b/services/nodemanager/arvnodeman/timedcallback.py
index 615f798..12d6280 100644
--- a/services/nodemanager/arvnodeman/timedcallback.py
+++ b/services/nodemanager/arvnodeman/timedcallback.py
@@ -18,7 +18,7 @@ class TimedCallBackActor(actor_class):
     """
     def __init__(self, max_sleep=1):
         super(TimedCallBackActor, self).__init__()
-        self._proxy = self.actor_ref.proxy()
+        self._proxy = self.actor_ref.tell_proxy()
         self.messages = []
         self.max_sleep = max_sleep
 
diff --git a/services/nodemanager/tests/test_daemon.py b/services/nodemanager/tests/test_daemon.py
index f41fa6c..554fb88 100644
--- a/services/nodemanager/tests/test_daemon.py
+++ b/services/nodemanager/tests/test_daemon.py
@@ -26,6 +26,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
                                           cloud_size=get_cloud_size,
                                           actor_ref=mock_actor)
         mock_actor.proxy.return_value = mock_proxy
+        mock_actor.tell_proxy.return_value = mock_proxy
 
         self.last_setup = mock_proxy
         return mock_actor
diff --git a/services/nodemanager/tests/test_failure.py b/services/nodemanager/tests/test_failure.py
index afebb9c..35605fc 100644
--- a/services/nodemanager/tests/test_failure.py
+++ b/services/nodemanager/tests/test_failure.py
@@ -12,9 +12,9 @@ import pykka
 
 from . import testutil
 
-import arvnodeman.fullstopactor
+import arvnodeman.baseactor
 
-class BogusActor(arvnodeman.fullstopactor.FullStopActor):
+class BogusActor(arvnodeman.baseactor.BaseNodeManagerActor):
     def __init__(self, e):
         super(BogusActor, self).__init__()
         self.exp = e
@@ -23,26 +23,17 @@ class BogusActor(arvnodeman.fullstopactor.FullStopActor):
         raise self.exp
 
 class ActorUnhandledExceptionTest(unittest.TestCase):
-    def test1(self):
+    def test_fatal_error(self):
         for e in (MemoryError(), threading.ThreadError(), OSError(errno.ENOMEM, "")):
             with mock.patch('os.killpg') as killpg_mock:
-                act = BogusActor.start(e)
-                act.tell({
-                    'command': 'pykka_call',
-                    'attr_path': ("doStuff",),
-                    'args': [],
-                    'kwargs': {}
-                })
-                act.stop(block=True)
+                act = BogusActor.start(e).tell_proxy()
+                act.doStuff()
+                act.actor_ref.stop(block=True)
                 self.assertTrue(killpg_mock.called)
 
+    def test_nonfatal_error(self):
         with mock.patch('os.killpg') as killpg_mock:
-            act = BogusActor.start(OSError(errno.ENOENT, ""))
-            act.tell({
-                'command': 'pykka_call',
-                'attr_path': ("doStuff",),
-                'args': [],
-                'kwargs': {}
-            })
-            act.stop(block=True)
+            act = BogusActor.start(OSError(errno.ENOENT, "")).tell_proxy()
+            act.doStuff()
+            act.actor_ref.stop(block=True)
             self.assertFalse(killpg_mock.called)
diff --git a/services/nodemanager/tests/testutil.py b/services/nodemanager/tests/testutil.py
index 6cde766..5803b05 100644
--- a/services/nodemanager/tests/testutil.py
+++ b/services/nodemanager/tests/testutil.py
@@ -85,7 +85,10 @@ class MockTimer(object):
             to_deliver = self.messages
             self.messages = []
         for callback, args, kwargs in to_deliver:
-            callback(*args, **kwargs)
+            try:
+                callback(*args, **kwargs)
+            except pykka.ActorDeadError:
+                pass
 
     def schedule(self, want_time, callback, *args, **kwargs):
         with self.lock:

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list