[ARVADOS] created: 223fd5187a0ecaa9f9c65be2a6733b4f3b56c99e

Git user git at public.curoverse.com
Thu Mar 3 16:41:28 EST 2016


        at  223fd5187a0ecaa9f9c65be2a6733b4f3b56c99e (commit)


commit 223fd5187a0ecaa9f9c65be2a6733b4f3b56c99e
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Thu Mar 3 16:41:18 2016 -0500

    8543: Implement BaseNodeMangerActor and TellableActorRef with tell_proxy().
    Almost all tests pass, fixing remaining.

diff --git a/services/nodemanager/arvnodeman/baseactor.py b/services/nodemanager/arvnodeman/baseactor.py
new file mode 100644
index 0000000..01a05be
--- /dev/null
+++ b/services/nodemanager/arvnodeman/baseactor.py
@@ -0,0 +1,65 @@
+from __future__ import absolute_import, print_function
+
+import errno
+import logging
+import os
+import threading
+import traceback
+
+import pykka
+
+class _TellCallableProxy(object):
+    """Internal helper class for proxying callables."""
+
+    def __init__(self, ref, attr_path):
+        self.actor_ref = ref
+        self._attr_path = attr_path
+
+    def __call__(self, *args, **kwargs):
+        message = {
+            'command': 'pykka_call',
+            'attr_path': self._attr_path,
+            'args': args,
+            'kwargs': kwargs,
+        }
+        self.actor_ref.tell(message)
+
+
+class TellActorProxy(pykka.ActorProxy):
+    """Like ActorProxy, except only permits asynchronous stuff"""
+
+    def __repr__(self):
+        return '<ActorProxy for %s, attr_path=%s>' % (
+            self.actor_ref, self._attr_path)
+
+    def __getattr__(self, name):
+        """Get a callable from the actor."""
+        attr_path = self._attr_path + (name,)
+        if attr_path not in self._known_attrs:
+            self._known_attrs = self._get_attributes()
+        attr_info = self._known_attrs.get(attr_path)
+        if attr_info is None:
+            raise AttributeError('%s has no attribute "%s"' % (self, name))
+        if attr_info['callable']:
+            if attr_path not in self._callable_proxies:
+                self._callable_proxies[attr_path] = _TellCallableProxy(
+                    self.actor_ref, attr_path)
+            return self._callable_proxies[attr_path]
+        else:
+            raise AttributeError('%s not a callable on %s has no attribute' % (name, self))
+
+class TellableActorRef(pykka.ActorRef):
+    def tell_proxy(self):
+        return TellActorProxy(self)
+
+class BaseNodeMangerActor(pykka.ThreadingActor):
+    def __init__(self, *args, **kwargs):
+         super(pykka.ThreadingActor, self).__init__(*args, **kwargs)
+         self.actor_ref = TellableActorRef(self)
+
+    def on_failure(self, exception_type, exception_value, tb):
+        lg = getattr(self, "_logger", logging)
+        if (exception_type in (threading.ThreadError, MemoryError) or
+            exception_type is OSError and exception_value.errno == errno.ENOMEM):
+            lg.critical("Unhandled exception is a fatal error, killing Node Manager")
+            os.killpg(os.getpgid(0), 9)
diff --git a/services/nodemanager/arvnodeman/clientactor.py b/services/nodemanager/arvnodeman/clientactor.py
index 66e94af..e130749 100644
--- a/services/nodemanager/arvnodeman/clientactor.py
+++ b/services/nodemanager/arvnodeman/clientactor.py
@@ -8,7 +8,6 @@ import time
 import pykka
 
 from .config import actor_class
-from .tellactorproxy import TellActorProxy
 
 def _notify_subscribers(response, subscribers):
     """Send the response to all the subscriber methods.
@@ -39,7 +38,7 @@ class RemotePollLoopActor(actor_class):
         super(RemotePollLoopActor, self).__init__()
         self._client = client
         self._timer = timer_actor
-        self._later = TellActorProxy(self.actor_ref)
+        self._later = self.actor_ref.tell_proxy()
         self._polling_started = False
         self.min_poll_wait = poll_wait
         self.max_poll_wait = max_poll_wait
diff --git a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
index 028ad6f..ef37844 100644
--- a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
+++ b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
@@ -13,7 +13,6 @@ from .. import \
     arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh, \
     arvados_node_missing, RetryMixin
 from ...clientactor import _notify_subscribers
-from ...tellactorproxy import TellActorProxy
 from ... import config
 
 class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
@@ -27,7 +26,7 @@ class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
         super(ComputeNodeStateChangeBase, self).__init__()
         RetryMixin.__init__(self, retry_wait, max_retry_wait,
                             None, cloud_client, timer_actor)
-        self._later = TellActorProxy(self.actor_ref)
+        self._later = self.actor_ref.tell_proxy()
         self._arvados = arvados_client
         self.subscribers = set()
 
@@ -39,7 +38,7 @@ class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
 
     def _finished(self):
         _notify_subscribers(self._later, self.subscribers)
-        self.subscribers = None
+        self.subscribers.clear()
         self._logger.info("finished")
 
     def subscribe(self, subscriber):
@@ -226,6 +225,7 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
         if not self._cloud.destroy_node(self.cloud_node):
             if self._cloud.broken(self.cloud_node):
                 self._later.cancel_shutdown(self.NODE_BROKEN)
+                return
             else:
                 # Force a retry.
                 raise cloud_types.LibcloudError("destroy_node failed")
@@ -305,7 +305,7 @@ class ComputeNodeMonitorActor(config.actor_class):
                  boot_fail_after=1800
     ):
         super(ComputeNodeMonitorActor, self).__init__()
-        self._later = TellActorProxy(self.actor_ref)
+        self._later = self.actor_ref.tell_proxy()
         self._last_log = None
         self._shutdowns = shutdown_timer
         self._cloud_node_fqdn = cloud_fqdn_func
diff --git a/services/nodemanager/arvnodeman/config.py b/services/nodemanager/arvnodeman/config.py
index dcfe1ce..76fb228 100644
--- a/services/nodemanager/arvnodeman/config.py
+++ b/services/nodemanager/arvnodeman/config.py
@@ -12,7 +12,7 @@ import httplib2
 import pykka
 from apiclient import errors as apierror
 
-from .fullstopactor import FullStopActor
+from .baseactor import BaseNodeMangerActor
 
 # IOError is the base class for socket.error, ssl.SSLError, and friends.
 # It seems like it hits the sweet spot for operations we want to retry:
@@ -20,7 +20,7 @@ from .fullstopactor import FullStopActor
 NETWORK_ERRORS = (IOError,)
 ARVADOS_ERRORS = NETWORK_ERRORS + (apierror.Error,)
 
-actor_class = FullStopActor
+actor_class = BaseNodeMangerActor
 
 class NodeManagerConfig(ConfigParser.SafeConfigParser):
     """Node Manager Configuration class.
diff --git a/services/nodemanager/arvnodeman/daemon.py b/services/nodemanager/arvnodeman/daemon.py
index d3115e1..23804f7 100644
--- a/services/nodemanager/arvnodeman/daemon.py
+++ b/services/nodemanager/arvnodeman/daemon.py
@@ -11,12 +11,11 @@ import pykka
 from . import computenode as cnode
 from .computenode import dispatch
 from .config import actor_class
-from .tellactorproxy TellActorProxy
 
 class _ComputeNodeRecord(object):
     def __init__(self, actor=None, cloud_node=None, arvados_node=None,
                  assignment_time=float('-inf')):
-        self.actor = actor.proxy()
+        self.actor = actor
         self.cloud_node = cloud_node
         self.arvados_node = arvados_node
         self.assignment_time = assignment_time
@@ -122,7 +121,7 @@ class NodeManagerDaemonActor(actor_class):
         self._new_arvados = arvados_factory
         self._new_cloud = cloud_factory
         self._cloud_driver = self._new_cloud()
-        self._later = TellActorProxy(self.actor_ref)
+        self._later = self.actor_ref.tell_proxy()
         self.shutdown_windows = shutdown_windows
         self.server_calculator = server_calculator
         self.min_cloud_size = self.server_calculator.cheapest_size()
@@ -176,11 +175,11 @@ class NodeManagerDaemonActor(actor_class):
             node_stale_after=self.node_stale_after,
             cloud_client=self._cloud_driver,
             boot_fail_after=self.boot_fail_after)
-        actorTell = TellActorProxy(actor)
+        actorTell = actor.tell_proxy()
         actorTell.subscribe(self._later.node_can_shutdown)
         self._cloud_nodes_actor.subscribe_to(cloud_node.id,
                                              actorTell.update_cloud_node)
-        record = _ComputeNodeRecord(actor, cloud_node)
+        record = _ComputeNodeRecord(actor.proxy(), cloud_node)
         return record
 
     def update_cloud_nodes(self, nodelist):
@@ -199,10 +198,11 @@ class NodeManagerDaemonActor(actor_class):
         for key, record in self.cloud_nodes.orphans.iteritems():
             if key in self.shutdowns:
                 try:
-                    self.shutdowns[key].stop()
+                    self.shutdowns[key].stop().get()
                 except pykka.ActorDeadError:
                     pass
                 del self.shutdowns[key]
+            if key in self.sizes_booting_shutdown:
                 del self.sizes_booting_shutdown[key]
             record.actor.stop()
             record.cloud_node = None
@@ -357,12 +357,12 @@ class NodeManagerDaemonActor(actor_class):
         arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
         self._logger.info("Want %i more %s nodes.  Booting a node.",
                           nodes_wanted, cloud_size.name)
-        new_setup = TellActorProxy(self._node_setup.start(
+        new_setup = self._node_setup.start(
             timer_actor=self._timer,
             arvados_client=self._new_arvados(),
             arvados_node=arvados_node,
             cloud_client=self._new_cloud(),
-            cloud_size=cloud_size))
+            cloud_size=cloud_size).tell_proxy()
         self.booting[new_setup.actor_ref.actor_urn] = new_setup
         self.sizes_booting_shutdown[new_setup.actor_ref.actor_urn] = cloud_size
 
@@ -409,13 +409,13 @@ class NodeManagerDaemonActor(actor_class):
         cloud_node_id = cloud_node_obj.id
         if cloud_node_id in self.shutdowns:
             return None
-        shutdown = TellActorProxy(self._node_shutdown.start(
+        shutdown = self._node_shutdown.start(
             timer_actor=self._timer, cloud_client=self._new_cloud(),
             arvados_client=self._new_arvados(),
-            node_monitor=node_actor.actor_ref, cancellable=cancellable))
+            node_monitor=node_actor.actor_ref, cancellable=cancellable).proxy()
         self.shutdowns[cloud_node_id] = shutdown
         self.sizes_booting_shutdown[cloud_node_id] = cloud_node_obj.size
-        shutdown.subscribe(self._later.node_finished_shutdown)
+        shutdown.tell_proxy().subscribe(self._later.node_finished_shutdown)
 
     @_check_poll_freshness
     def node_can_shutdown(self, node_actor):
@@ -440,12 +440,9 @@ class NodeManagerDaemonActor(actor_class):
         if not success:
             if cancel_reason == self._node_shutdown.NODE_BROKEN:
                 self.cloud_nodes.blacklist(cloud_node_id)
-            del self.shutdowns[cloud_node_id]
-            del self.sizes_booting_shutdown[cloud_node_id]
         elif cloud_node_id in self.booted:
             self.booted.pop(cloud_node_id).actor.stop()
-            del self.shutdowns[cloud_node_id]
-            del self.sizes_booting_shutdown[cloud_node_id]
+        del self.shutdowns[cloud_node_id]
 
     def shutdown(self):
         self._logger.info("Shutting down after signal.")
diff --git a/services/nodemanager/arvnodeman/fullstopactor.py b/services/nodemanager/arvnodeman/fullstopactor.py
deleted file mode 100644
index 07e0625..0000000
--- a/services/nodemanager/arvnodeman/fullstopactor.py
+++ /dev/null
@@ -1,17 +0,0 @@
-from __future__ import absolute_import, print_function
-
-import errno
-import logging
-import os
-import threading
-import traceback
-
-import pykka
-
-class FullStopActor(pykka.ThreadingActor):
-    def on_failure(self, exception_type, exception_value, tb):
-        lg = getattr(self, "_logger", logging)
-        if (exception_type in (threading.ThreadError, MemoryError) or
-            exception_type is OSError and exception_value.errno == errno.ENOMEM):
-            lg.critical("Unhandled exception is a fatal error, killing Node Manager")
-            os.killpg(os.getpgid(0), 9)
diff --git a/services/nodemanager/arvnodeman/launcher.py b/services/nodemanager/arvnodeman/launcher.py
index 3586c4b..78bd2db 100644
--- a/services/nodemanager/arvnodeman/launcher.py
+++ b/services/nodemanager/arvnodeman/launcher.py
@@ -16,7 +16,6 @@ from .daemon import NodeManagerDaemonActor
 from .jobqueue import JobQueueMonitorActor, ServerCalculator
 from .nodelist import ArvadosNodeListMonitorActor, CloudNodeListMonitorActor
 from .timedcallback import TimedCallBackActor
-from .tellactorproxy import TellActorProxy
 
 node_daemon = None
 
@@ -70,14 +69,14 @@ def launch_pollers(config, server_calculator):
     poll_time = config.getint('Daemon', 'poll_time')
     max_poll_time = config.getint('Daemon', 'max_poll_time')
 
-    timer = TellActorProxy(TimedCallBackActor.start(poll_time / 10.0))
-    cloud_node_poller = TellActorProxy(CloudNodeListMonitorActor.start(
-        config.new_cloud_client(), timer, poll_time, max_poll_time))
-    arvados_node_poller = TellActorProxy(ArvadosNodeListMonitorActor.start(
-        config.new_arvados_client(), timer, poll_time, max_poll_time))
-    job_queue_poller = TellActorProxy(JobQueueMonitorActor.start(
+    timer = TimedCallBackActor.start(poll_time / 10.0).tell_proxy()
+    cloud_node_poller = CloudNodeListMonitorActor.start(
+        config.new_cloud_client(), timer, poll_time, max_poll_time).tell_proxy()
+    arvados_node_poller = ArvadosNodeListMonitorActor.start(
+        config.new_arvados_client(), timer, poll_time, max_poll_time).tell_proxy()
+    job_queue_poller = JobQueueMonitorActor.start(
         config.new_arvados_client(), timer, server_calculator,
-        poll_time, max_poll_time))
+        poll_time, max_poll_time).tell_proxy()
     return timer, cloud_node_poller, arvados_node_poller, job_queue_poller
 
 _caught_signals = {}
@@ -111,7 +110,7 @@ def main(args=None):
         server_calculator = build_server_calculator(config)
         timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
             launch_pollers(config, server_calculator)
-        cloud_node_updater = TellActorProxy(node_update.start(config.new_cloud_client))
+        cloud_node_updater = node_update.start(config.new_cloud_client).tell_proxy()
         node_daemon = NodeManagerDaemonActor.start(
             job_queue_poller, arvados_node_poller, cloud_node_poller,
             cloud_node_updater, timer,
@@ -124,7 +123,7 @@ def main(args=None):
             config.getint('Daemon', 'boot_fail_after'),
             config.getint('Daemon', 'node_stale_after'),
             node_setup, node_shutdown, node_monitor,
-            max_total_price=config.getfloat('Daemon', 'max_total_price')).proxy()
+            max_total_price=config.getfloat('Daemon', 'max_total_price')).tell_proxy()
 
         signal.pause()
         daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
diff --git a/services/nodemanager/arvnodeman/timedcallback.py b/services/nodemanager/arvnodeman/timedcallback.py
index a89e437..12d6280 100644
--- a/services/nodemanager/arvnodeman/timedcallback.py
+++ b/services/nodemanager/arvnodeman/timedcallback.py
@@ -8,7 +8,6 @@ import time
 import pykka
 
 from .config import actor_class
-from .tellactorproxy import TellActorProxy
 
 class TimedCallBackActor(actor_class):
     """Send messages to other actors on a schedule.
@@ -19,7 +18,7 @@ class TimedCallBackActor(actor_class):
     """
     def __init__(self, max_sleep=1):
         super(TimedCallBackActor, self).__init__()
-        self._proxy = TellActorProxy(self.actor_ref)
+        self._proxy = self.actor_ref.tell_proxy()
         self.messages = []
         self.max_sleep = max_sleep
 
diff --git a/services/nodemanager/tests/test_daemon.py b/services/nodemanager/tests/test_daemon.py
index f41fa6c..554fb88 100644
--- a/services/nodemanager/tests/test_daemon.py
+++ b/services/nodemanager/tests/test_daemon.py
@@ -26,6 +26,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
                                           cloud_size=get_cloud_size,
                                           actor_ref=mock_actor)
         mock_actor.proxy.return_value = mock_proxy
+        mock_actor.tell_proxy.return_value = mock_proxy
 
         self.last_setup = mock_proxy
         return mock_actor
diff --git a/services/nodemanager/tests/test_failure.py b/services/nodemanager/tests/test_failure.py
index afebb9c..aa497a4 100644
--- a/services/nodemanager/tests/test_failure.py
+++ b/services/nodemanager/tests/test_failure.py
@@ -12,9 +12,9 @@ import pykka
 
 from . import testutil
 
-import arvnodeman.fullstopactor
+import arvnodeman.baseactor
 
-class BogusActor(arvnodeman.fullstopactor.FullStopActor):
+class BogusActor(arvnodeman.baseactor.BaseNodeMangerActor):
     def __init__(self, e):
         super(BogusActor, self).__init__()
         self.exp = e
@@ -23,26 +23,17 @@ class BogusActor(arvnodeman.fullstopactor.FullStopActor):
         raise self.exp
 
 class ActorUnhandledExceptionTest(unittest.TestCase):
-    def test1(self):
+    def test_fatal_error(self):
         for e in (MemoryError(), threading.ThreadError(), OSError(errno.ENOMEM, "")):
             with mock.patch('os.killpg') as killpg_mock:
-                act = BogusActor.start(e)
-                act.tell({
-                    'command': 'pykka_call',
-                    'attr_path': ("doStuff",),
-                    'args': [],
-                    'kwargs': {}
-                })
-                act.stop(block=True)
+                act = BogusActor.start(e).tell_proxy()
+                act.doStuff()
+                act.actor_ref.stop(block=True)
                 self.assertTrue(killpg_mock.called)
 
+    def test_nonfatal_error(self):
         with mock.patch('os.killpg') as killpg_mock:
-            act = BogusActor.start(OSError(errno.ENOENT, ""))
-            act.tell({
-                'command': 'pykka_call',
-                'attr_path': ("doStuff",),
-                'args': [],
-                'kwargs': {}
-            })
-            act.stop(block=True)
+            act = BogusActor.start(OSError(errno.ENOENT, "")).tell_proxy()
+            act.doStuff()
+            act.actor_ref.stop(block=True)
             self.assertFalse(killpg_mock.called)
diff --git a/services/nodemanager/tests/testutil.py b/services/nodemanager/tests/testutil.py
index 6cde766..5803b05 100644
--- a/services/nodemanager/tests/testutil.py
+++ b/services/nodemanager/tests/testutil.py
@@ -85,7 +85,10 @@ class MockTimer(object):
             to_deliver = self.messages
             self.messages = []
         for callback, args, kwargs in to_deliver:
-            callback(*args, **kwargs)
+            try:
+                callback(*args, **kwargs)
+            except pykka.ActorDeadError:
+                pass
 
     def schedule(self, want_time, callback, *args, **kwargs):
         with self.lock:

commit 8cb07346b3b7467cd74b187605ad665fae98a520
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Thu Mar 3 13:54:30 2016 -0500

    8543: Convert most pykka proxies to TellActorProxy

diff --git a/services/nodemanager/arvnodeman/clientactor.py b/services/nodemanager/arvnodeman/clientactor.py
index 9a9ce58..66e94af 100644
--- a/services/nodemanager/arvnodeman/clientactor.py
+++ b/services/nodemanager/arvnodeman/clientactor.py
@@ -8,6 +8,7 @@ import time
 import pykka
 
 from .config import actor_class
+from .tellactorproxy import TellActorProxy
 
 def _notify_subscribers(response, subscribers):
     """Send the response to all the subscriber methods.
@@ -38,7 +39,7 @@ class RemotePollLoopActor(actor_class):
         super(RemotePollLoopActor, self).__init__()
         self._client = client
         self._timer = timer_actor
-        self._later = self.actor_ref.proxy()
+        self._later = TellActorProxy(self.actor_ref)
         self._polling_started = False
         self.min_poll_wait = poll_wait
         self.max_poll_wait = max_poll_wait
diff --git a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
index 2ae4fc8..028ad6f 100644
--- a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
+++ b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
@@ -13,6 +13,7 @@ from .. import \
     arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh, \
     arvados_node_missing, RetryMixin
 from ...clientactor import _notify_subscribers
+from ...tellactorproxy import TellActorProxy
 from ... import config
 
 class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
@@ -26,7 +27,7 @@ class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
         super(ComputeNodeStateChangeBase, self).__init__()
         RetryMixin.__init__(self, retry_wait, max_retry_wait,
                             None, cloud_client, timer_actor)
-        self._later = self.actor_ref.proxy()
+        self._later = TellActorProxy(self.actor_ref)
         self._arvados = arvados_client
         self.subscribers = set()
 
@@ -304,7 +305,7 @@ class ComputeNodeMonitorActor(config.actor_class):
                  boot_fail_after=1800
     ):
         super(ComputeNodeMonitorActor, self).__init__()
-        self._later = self.actor_ref.proxy()
+        self._later = TellActorProxy(self.actor_ref)
         self._last_log = None
         self._shutdowns = shutdown_timer
         self._cloud_node_fqdn = cloud_fqdn_func
diff --git a/services/nodemanager/arvnodeman/daemon.py b/services/nodemanager/arvnodeman/daemon.py
index 0993c47..d3115e1 100644
--- a/services/nodemanager/arvnodeman/daemon.py
+++ b/services/nodemanager/arvnodeman/daemon.py
@@ -11,11 +11,12 @@ import pykka
 from . import computenode as cnode
 from .computenode import dispatch
 from .config import actor_class
+from .tellactorproxy TellActorProxy
 
 class _ComputeNodeRecord(object):
     def __init__(self, actor=None, cloud_node=None, arvados_node=None,
                  assignment_time=float('-inf')):
-        self.actor = actor
+        self.actor = actor.proxy()
         self.cloud_node = cloud_node
         self.arvados_node = arvados_node
         self.assignment_time = assignment_time
@@ -121,7 +122,7 @@ class NodeManagerDaemonActor(actor_class):
         self._new_arvados = arvados_factory
         self._new_cloud = cloud_factory
         self._cloud_driver = self._new_cloud()
-        self._later = self.actor_ref.proxy()
+        self._later = TellActorProxy(self.actor_ref)
         self.shutdown_windows = shutdown_windows
         self.server_calculator = server_calculator
         self.min_cloud_size = self.server_calculator.cheapest_size()
@@ -174,10 +175,11 @@ class NodeManagerDaemonActor(actor_class):
             poll_stale_after=self.poll_stale_after,
             node_stale_after=self.node_stale_after,
             cloud_client=self._cloud_driver,
-            boot_fail_after=self.boot_fail_after).proxy()
-        actor.subscribe(self._later.node_can_shutdown)
+            boot_fail_after=self.boot_fail_after)
+        actorTell = TellActorProxy(actor)
+        actorTell.subscribe(self._later.node_can_shutdown)
         self._cloud_nodes_actor.subscribe_to(cloud_node.id,
-                                             actor.update_cloud_node)
+                                             actorTell.update_cloud_node)
         record = _ComputeNodeRecord(actor, cloud_node)
         return record
 
@@ -197,7 +199,7 @@ class NodeManagerDaemonActor(actor_class):
         for key, record in self.cloud_nodes.orphans.iteritems():
             if key in self.shutdowns:
                 try:
-                    self.shutdowns[key].stop().get()
+                    self.shutdowns[key].stop()
                 except pykka.ActorDeadError:
                     pass
                 del self.shutdowns[key]
@@ -355,12 +357,12 @@ class NodeManagerDaemonActor(actor_class):
         arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
         self._logger.info("Want %i more %s nodes.  Booting a node.",
                           nodes_wanted, cloud_size.name)
-        new_setup = self._node_setup.start(
+        new_setup = TellActorProxy(self._node_setup.start(
             timer_actor=self._timer,
             arvados_client=self._new_arvados(),
             arvados_node=arvados_node,
             cloud_client=self._new_cloud(),
-            cloud_size=cloud_size).proxy()
+            cloud_size=cloud_size))
         self.booting[new_setup.actor_ref.actor_urn] = new_setup
         self.sizes_booting_shutdown[new_setup.actor_ref.actor_urn] = cloud_size
 
@@ -407,10 +409,10 @@ class NodeManagerDaemonActor(actor_class):
         cloud_node_id = cloud_node_obj.id
         if cloud_node_id in self.shutdowns:
             return None
-        shutdown = self._node_shutdown.start(
+        shutdown = TellActorProxy(self._node_shutdown.start(
             timer_actor=self._timer, cloud_client=self._new_cloud(),
             arvados_client=self._new_arvados(),
-            node_monitor=node_actor.actor_ref, cancellable=cancellable).proxy()
+            node_monitor=node_actor.actor_ref, cancellable=cancellable))
         self.shutdowns[cloud_node_id] = shutdown
         self.sizes_booting_shutdown[cloud_node_id] = cloud_node_obj.size
         shutdown.subscribe(self._later.node_finished_shutdown)
diff --git a/services/nodemanager/arvnodeman/launcher.py b/services/nodemanager/arvnodeman/launcher.py
index c8b3d19..3586c4b 100644
--- a/services/nodemanager/arvnodeman/launcher.py
+++ b/services/nodemanager/arvnodeman/launcher.py
@@ -16,6 +16,7 @@ from .daemon import NodeManagerDaemonActor
 from .jobqueue import JobQueueMonitorActor, ServerCalculator
 from .nodelist import ArvadosNodeListMonitorActor, CloudNodeListMonitorActor
 from .timedcallback import TimedCallBackActor
+from .tellactorproxy import TellActorProxy
 
 node_daemon = None
 
@@ -69,14 +70,14 @@ def launch_pollers(config, server_calculator):
     poll_time = config.getint('Daemon', 'poll_time')
     max_poll_time = config.getint('Daemon', 'max_poll_time')
 
-    timer = TimedCallBackActor.start(poll_time / 10.0).proxy()
-    cloud_node_poller = CloudNodeListMonitorActor.start(
-        config.new_cloud_client(), timer, poll_time, max_poll_time).proxy()
-    arvados_node_poller = ArvadosNodeListMonitorActor.start(
-        config.new_arvados_client(), timer, poll_time, max_poll_time).proxy()
-    job_queue_poller = JobQueueMonitorActor.start(
+    timer = TellActorProxy(TimedCallBackActor.start(poll_time / 10.0))
+    cloud_node_poller = TellActorProxy(CloudNodeListMonitorActor.start(
+        config.new_cloud_client(), timer, poll_time, max_poll_time))
+    arvados_node_poller = TellActorProxy(ArvadosNodeListMonitorActor.start(
+        config.new_arvados_client(), timer, poll_time, max_poll_time))
+    job_queue_poller = TellActorProxy(JobQueueMonitorActor.start(
         config.new_arvados_client(), timer, server_calculator,
-        poll_time, max_poll_time).proxy()
+        poll_time, max_poll_time))
     return timer, cloud_node_poller, arvados_node_poller, job_queue_poller
 
 _caught_signals = {}
@@ -110,7 +111,7 @@ def main(args=None):
         server_calculator = build_server_calculator(config)
         timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
             launch_pollers(config, server_calculator)
-        cloud_node_updater = node_update.start(config.new_cloud_client).proxy()
+        cloud_node_updater = TellActorProxy(node_update.start(config.new_cloud_client))
         node_daemon = NodeManagerDaemonActor.start(
             job_queue_poller, arvados_node_poller, cloud_node_poller,
             cloud_node_updater, timer,
diff --git a/services/nodemanager/arvnodeman/timedcallback.py b/services/nodemanager/arvnodeman/timedcallback.py
index 615f798..a89e437 100644
--- a/services/nodemanager/arvnodeman/timedcallback.py
+++ b/services/nodemanager/arvnodeman/timedcallback.py
@@ -8,6 +8,7 @@ import time
 import pykka
 
 from .config import actor_class
+from .tellactorproxy import TellActorProxy
 
 class TimedCallBackActor(actor_class):
     """Send messages to other actors on a schedule.
@@ -18,7 +19,7 @@ class TimedCallBackActor(actor_class):
     """
     def __init__(self, max_sleep=1):
         super(TimedCallBackActor, self).__init__()
-        self._proxy = self.actor_ref.proxy()
+        self._proxy = TellActorProxy(self.actor_ref)
         self.messages = []
         self.max_sleep = max_sleep
 

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list