[ARVADOS] created: 223fd5187a0ecaa9f9c65be2a6733b4f3b56c99e
Git user
git at public.curoverse.com
Thu Mar 3 16:41:28 EST 2016
at 223fd5187a0ecaa9f9c65be2a6733b4f3b56c99e (commit)
commit 223fd5187a0ecaa9f9c65be2a6733b4f3b56c99e
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Thu Mar 3 16:41:18 2016 -0500
8543: Implement BaseNodeMangerActor and TellableActorRef with tell_proxy().
Almost all tests pass, fixing remaining.
diff --git a/services/nodemanager/arvnodeman/baseactor.py b/services/nodemanager/arvnodeman/baseactor.py
new file mode 100644
index 0000000..01a05be
--- /dev/null
+++ b/services/nodemanager/arvnodeman/baseactor.py
@@ -0,0 +1,65 @@
+from __future__ import absolute_import, print_function
+
+import errno
+import logging
+import os
+import threading
+import traceback
+
+import pykka
+
+class _TellCallableProxy(object):
+ """Internal helper class for proxying callables."""
+
+ def __init__(self, ref, attr_path):
+ self.actor_ref = ref
+ self._attr_path = attr_path
+
+ def __call__(self, *args, **kwargs):
+ message = {
+ 'command': 'pykka_call',
+ 'attr_path': self._attr_path,
+ 'args': args,
+ 'kwargs': kwargs,
+ }
+ self.actor_ref.tell(message)
+
+
+class TellActorProxy(pykka.ActorProxy):
+ """Like ActorProxy, except only permits asynchronous stuff"""
+
+ def __repr__(self):
+ return '<ActorProxy for %s, attr_path=%s>' % (
+ self.actor_ref, self._attr_path)
+
+ def __getattr__(self, name):
+ """Get a callable from the actor."""
+ attr_path = self._attr_path + (name,)
+ if attr_path not in self._known_attrs:
+ self._known_attrs = self._get_attributes()
+ attr_info = self._known_attrs.get(attr_path)
+ if attr_info is None:
+ raise AttributeError('%s has no attribute "%s"' % (self, name))
+ if attr_info['callable']:
+ if attr_path not in self._callable_proxies:
+ self._callable_proxies[attr_path] = _TellCallableProxy(
+ self.actor_ref, attr_path)
+ return self._callable_proxies[attr_path]
+ else:
+ raise AttributeError('%s not a callable on %s has no attribute' % (name, self))
+
+class TellableActorRef(pykka.ActorRef):
+ def tell_proxy(self):
+ return TellActorProxy(self)
+
+class BaseNodeMangerActor(pykka.ThreadingActor):
+ def __init__(self, *args, **kwargs):
+ super(pykka.ThreadingActor, self).__init__(*args, **kwargs)
+ self.actor_ref = TellableActorRef(self)
+
+ def on_failure(self, exception_type, exception_value, tb):
+ lg = getattr(self, "_logger", logging)
+ if (exception_type in (threading.ThreadError, MemoryError) or
+ exception_type is OSError and exception_value.errno == errno.ENOMEM):
+ lg.critical("Unhandled exception is a fatal error, killing Node Manager")
+ os.killpg(os.getpgid(0), 9)
diff --git a/services/nodemanager/arvnodeman/clientactor.py b/services/nodemanager/arvnodeman/clientactor.py
index 66e94af..e130749 100644
--- a/services/nodemanager/arvnodeman/clientactor.py
+++ b/services/nodemanager/arvnodeman/clientactor.py
@@ -8,7 +8,6 @@ import time
import pykka
from .config import actor_class
-from .tellactorproxy import TellActorProxy
def _notify_subscribers(response, subscribers):
"""Send the response to all the subscriber methods.
@@ -39,7 +38,7 @@ class RemotePollLoopActor(actor_class):
super(RemotePollLoopActor, self).__init__()
self._client = client
self._timer = timer_actor
- self._later = TellActorProxy(self.actor_ref)
+ self._later = self.actor_ref.tell_proxy()
self._polling_started = False
self.min_poll_wait = poll_wait
self.max_poll_wait = max_poll_wait
diff --git a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
index 028ad6f..ef37844 100644
--- a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
+++ b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
@@ -13,7 +13,6 @@ from .. import \
arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh, \
arvados_node_missing, RetryMixin
from ...clientactor import _notify_subscribers
-from ...tellactorproxy import TellActorProxy
from ... import config
class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
@@ -27,7 +26,7 @@ class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
super(ComputeNodeStateChangeBase, self).__init__()
RetryMixin.__init__(self, retry_wait, max_retry_wait,
None, cloud_client, timer_actor)
- self._later = TellActorProxy(self.actor_ref)
+ self._later = self.actor_ref.tell_proxy()
self._arvados = arvados_client
self.subscribers = set()
@@ -39,7 +38,7 @@ class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
def _finished(self):
_notify_subscribers(self._later, self.subscribers)
- self.subscribers = None
+ self.subscribers.clear()
self._logger.info("finished")
def subscribe(self, subscriber):
@@ -226,6 +225,7 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
if not self._cloud.destroy_node(self.cloud_node):
if self._cloud.broken(self.cloud_node):
self._later.cancel_shutdown(self.NODE_BROKEN)
+ return
else:
# Force a retry.
raise cloud_types.LibcloudError("destroy_node failed")
@@ -305,7 +305,7 @@ class ComputeNodeMonitorActor(config.actor_class):
boot_fail_after=1800
):
super(ComputeNodeMonitorActor, self).__init__()
- self._later = TellActorProxy(self.actor_ref)
+ self._later = self.actor_ref.tell_proxy()
self._last_log = None
self._shutdowns = shutdown_timer
self._cloud_node_fqdn = cloud_fqdn_func
diff --git a/services/nodemanager/arvnodeman/config.py b/services/nodemanager/arvnodeman/config.py
index dcfe1ce..76fb228 100644
--- a/services/nodemanager/arvnodeman/config.py
+++ b/services/nodemanager/arvnodeman/config.py
@@ -12,7 +12,7 @@ import httplib2
import pykka
from apiclient import errors as apierror
-from .fullstopactor import FullStopActor
+from .baseactor import BaseNodeMangerActor
# IOError is the base class for socket.error, ssl.SSLError, and friends.
# It seems like it hits the sweet spot for operations we want to retry:
@@ -20,7 +20,7 @@ from .fullstopactor import FullStopActor
NETWORK_ERRORS = (IOError,)
ARVADOS_ERRORS = NETWORK_ERRORS + (apierror.Error,)
-actor_class = FullStopActor
+actor_class = BaseNodeMangerActor
class NodeManagerConfig(ConfigParser.SafeConfigParser):
"""Node Manager Configuration class.
diff --git a/services/nodemanager/arvnodeman/daemon.py b/services/nodemanager/arvnodeman/daemon.py
index d3115e1..23804f7 100644
--- a/services/nodemanager/arvnodeman/daemon.py
+++ b/services/nodemanager/arvnodeman/daemon.py
@@ -11,12 +11,11 @@ import pykka
from . import computenode as cnode
from .computenode import dispatch
from .config import actor_class
-from .tellactorproxy TellActorProxy
class _ComputeNodeRecord(object):
def __init__(self, actor=None, cloud_node=None, arvados_node=None,
assignment_time=float('-inf')):
- self.actor = actor.proxy()
+ self.actor = actor
self.cloud_node = cloud_node
self.arvados_node = arvados_node
self.assignment_time = assignment_time
@@ -122,7 +121,7 @@ class NodeManagerDaemonActor(actor_class):
self._new_arvados = arvados_factory
self._new_cloud = cloud_factory
self._cloud_driver = self._new_cloud()
- self._later = TellActorProxy(self.actor_ref)
+ self._later = self.actor_ref.tell_proxy()
self.shutdown_windows = shutdown_windows
self.server_calculator = server_calculator
self.min_cloud_size = self.server_calculator.cheapest_size()
@@ -176,11 +175,11 @@ class NodeManagerDaemonActor(actor_class):
node_stale_after=self.node_stale_after,
cloud_client=self._cloud_driver,
boot_fail_after=self.boot_fail_after)
- actorTell = TellActorProxy(actor)
+ actorTell = actor.tell_proxy()
actorTell.subscribe(self._later.node_can_shutdown)
self._cloud_nodes_actor.subscribe_to(cloud_node.id,
actorTell.update_cloud_node)
- record = _ComputeNodeRecord(actor, cloud_node)
+ record = _ComputeNodeRecord(actor.proxy(), cloud_node)
return record
def update_cloud_nodes(self, nodelist):
@@ -199,10 +198,11 @@ class NodeManagerDaemonActor(actor_class):
for key, record in self.cloud_nodes.orphans.iteritems():
if key in self.shutdowns:
try:
- self.shutdowns[key].stop()
+ self.shutdowns[key].stop().get()
except pykka.ActorDeadError:
pass
del self.shutdowns[key]
+ if key in self.sizes_booting_shutdown:
del self.sizes_booting_shutdown[key]
record.actor.stop()
record.cloud_node = None
@@ -357,12 +357,12 @@ class NodeManagerDaemonActor(actor_class):
arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
self._logger.info("Want %i more %s nodes. Booting a node.",
nodes_wanted, cloud_size.name)
- new_setup = TellActorProxy(self._node_setup.start(
+ new_setup = self._node_setup.start(
timer_actor=self._timer,
arvados_client=self._new_arvados(),
arvados_node=arvados_node,
cloud_client=self._new_cloud(),
- cloud_size=cloud_size))
+ cloud_size=cloud_size).tell_proxy()
self.booting[new_setup.actor_ref.actor_urn] = new_setup
self.sizes_booting_shutdown[new_setup.actor_ref.actor_urn] = cloud_size
@@ -409,13 +409,13 @@ class NodeManagerDaemonActor(actor_class):
cloud_node_id = cloud_node_obj.id
if cloud_node_id in self.shutdowns:
return None
- shutdown = TellActorProxy(self._node_shutdown.start(
+ shutdown = self._node_shutdown.start(
timer_actor=self._timer, cloud_client=self._new_cloud(),
arvados_client=self._new_arvados(),
- node_monitor=node_actor.actor_ref, cancellable=cancellable))
+ node_monitor=node_actor.actor_ref, cancellable=cancellable).proxy()
self.shutdowns[cloud_node_id] = shutdown
self.sizes_booting_shutdown[cloud_node_id] = cloud_node_obj.size
- shutdown.subscribe(self._later.node_finished_shutdown)
+ shutdown.tell_proxy().subscribe(self._later.node_finished_shutdown)
@_check_poll_freshness
def node_can_shutdown(self, node_actor):
@@ -440,12 +440,9 @@ class NodeManagerDaemonActor(actor_class):
if not success:
if cancel_reason == self._node_shutdown.NODE_BROKEN:
self.cloud_nodes.blacklist(cloud_node_id)
- del self.shutdowns[cloud_node_id]
- del self.sizes_booting_shutdown[cloud_node_id]
elif cloud_node_id in self.booted:
self.booted.pop(cloud_node_id).actor.stop()
- del self.shutdowns[cloud_node_id]
- del self.sizes_booting_shutdown[cloud_node_id]
+ del self.shutdowns[cloud_node_id]
def shutdown(self):
self._logger.info("Shutting down after signal.")
diff --git a/services/nodemanager/arvnodeman/fullstopactor.py b/services/nodemanager/arvnodeman/fullstopactor.py
deleted file mode 100644
index 07e0625..0000000
--- a/services/nodemanager/arvnodeman/fullstopactor.py
+++ /dev/null
@@ -1,17 +0,0 @@
-from __future__ import absolute_import, print_function
-
-import errno
-import logging
-import os
-import threading
-import traceback
-
-import pykka
-
-class FullStopActor(pykka.ThreadingActor):
- def on_failure(self, exception_type, exception_value, tb):
- lg = getattr(self, "_logger", logging)
- if (exception_type in (threading.ThreadError, MemoryError) or
- exception_type is OSError and exception_value.errno == errno.ENOMEM):
- lg.critical("Unhandled exception is a fatal error, killing Node Manager")
- os.killpg(os.getpgid(0), 9)
diff --git a/services/nodemanager/arvnodeman/launcher.py b/services/nodemanager/arvnodeman/launcher.py
index 3586c4b..78bd2db 100644
--- a/services/nodemanager/arvnodeman/launcher.py
+++ b/services/nodemanager/arvnodeman/launcher.py
@@ -16,7 +16,6 @@ from .daemon import NodeManagerDaemonActor
from .jobqueue import JobQueueMonitorActor, ServerCalculator
from .nodelist import ArvadosNodeListMonitorActor, CloudNodeListMonitorActor
from .timedcallback import TimedCallBackActor
-from .tellactorproxy import TellActorProxy
node_daemon = None
@@ -70,14 +69,14 @@ def launch_pollers(config, server_calculator):
poll_time = config.getint('Daemon', 'poll_time')
max_poll_time = config.getint('Daemon', 'max_poll_time')
- timer = TellActorProxy(TimedCallBackActor.start(poll_time / 10.0))
- cloud_node_poller = TellActorProxy(CloudNodeListMonitorActor.start(
- config.new_cloud_client(), timer, poll_time, max_poll_time))
- arvados_node_poller = TellActorProxy(ArvadosNodeListMonitorActor.start(
- config.new_arvados_client(), timer, poll_time, max_poll_time))
- job_queue_poller = TellActorProxy(JobQueueMonitorActor.start(
+ timer = TimedCallBackActor.start(poll_time / 10.0).tell_proxy()
+ cloud_node_poller = CloudNodeListMonitorActor.start(
+ config.new_cloud_client(), timer, poll_time, max_poll_time).tell_proxy()
+ arvados_node_poller = ArvadosNodeListMonitorActor.start(
+ config.new_arvados_client(), timer, poll_time, max_poll_time).tell_proxy()
+ job_queue_poller = JobQueueMonitorActor.start(
config.new_arvados_client(), timer, server_calculator,
- poll_time, max_poll_time))
+ poll_time, max_poll_time).tell_proxy()
return timer, cloud_node_poller, arvados_node_poller, job_queue_poller
_caught_signals = {}
@@ -111,7 +110,7 @@ def main(args=None):
server_calculator = build_server_calculator(config)
timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
launch_pollers(config, server_calculator)
- cloud_node_updater = TellActorProxy(node_update.start(config.new_cloud_client))
+ cloud_node_updater = node_update.start(config.new_cloud_client).tell_proxy()
node_daemon = NodeManagerDaemonActor.start(
job_queue_poller, arvados_node_poller, cloud_node_poller,
cloud_node_updater, timer,
@@ -124,7 +123,7 @@ def main(args=None):
config.getint('Daemon', 'boot_fail_after'),
config.getint('Daemon', 'node_stale_after'),
node_setup, node_shutdown, node_monitor,
- max_total_price=config.getfloat('Daemon', 'max_total_price')).proxy()
+ max_total_price=config.getfloat('Daemon', 'max_total_price')).tell_proxy()
signal.pause()
daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
diff --git a/services/nodemanager/arvnodeman/timedcallback.py b/services/nodemanager/arvnodeman/timedcallback.py
index a89e437..12d6280 100644
--- a/services/nodemanager/arvnodeman/timedcallback.py
+++ b/services/nodemanager/arvnodeman/timedcallback.py
@@ -8,7 +8,6 @@ import time
import pykka
from .config import actor_class
-from .tellactorproxy import TellActorProxy
class TimedCallBackActor(actor_class):
"""Send messages to other actors on a schedule.
@@ -19,7 +18,7 @@ class TimedCallBackActor(actor_class):
"""
def __init__(self, max_sleep=1):
super(TimedCallBackActor, self).__init__()
- self._proxy = TellActorProxy(self.actor_ref)
+ self._proxy = self.actor_ref.tell_proxy()
self.messages = []
self.max_sleep = max_sleep
diff --git a/services/nodemanager/tests/test_daemon.py b/services/nodemanager/tests/test_daemon.py
index f41fa6c..554fb88 100644
--- a/services/nodemanager/tests/test_daemon.py
+++ b/services/nodemanager/tests/test_daemon.py
@@ -26,6 +26,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
cloud_size=get_cloud_size,
actor_ref=mock_actor)
mock_actor.proxy.return_value = mock_proxy
+ mock_actor.tell_proxy.return_value = mock_proxy
self.last_setup = mock_proxy
return mock_actor
diff --git a/services/nodemanager/tests/test_failure.py b/services/nodemanager/tests/test_failure.py
index afebb9c..aa497a4 100644
--- a/services/nodemanager/tests/test_failure.py
+++ b/services/nodemanager/tests/test_failure.py
@@ -12,9 +12,9 @@ import pykka
from . import testutil
-import arvnodeman.fullstopactor
+import arvnodeman.baseactor
-class BogusActor(arvnodeman.fullstopactor.FullStopActor):
+class BogusActor(arvnodeman.baseactor.BaseNodeMangerActor):
def __init__(self, e):
super(BogusActor, self).__init__()
self.exp = e
@@ -23,26 +23,17 @@ class BogusActor(arvnodeman.fullstopactor.FullStopActor):
raise self.exp
class ActorUnhandledExceptionTest(unittest.TestCase):
- def test1(self):
+ def test_fatal_error(self):
for e in (MemoryError(), threading.ThreadError(), OSError(errno.ENOMEM, "")):
with mock.patch('os.killpg') as killpg_mock:
- act = BogusActor.start(e)
- act.tell({
- 'command': 'pykka_call',
- 'attr_path': ("doStuff",),
- 'args': [],
- 'kwargs': {}
- })
- act.stop(block=True)
+ act = BogusActor.start(e).tell_proxy()
+ act.doStuff()
+ act.actor_ref.stop(block=True)
self.assertTrue(killpg_mock.called)
+ def test_nonfatal_error(self):
with mock.patch('os.killpg') as killpg_mock:
- act = BogusActor.start(OSError(errno.ENOENT, ""))
- act.tell({
- 'command': 'pykka_call',
- 'attr_path': ("doStuff",),
- 'args': [],
- 'kwargs': {}
- })
- act.stop(block=True)
+ act = BogusActor.start(OSError(errno.ENOENT, "")).tell_proxy()
+ act.doStuff()
+ act.actor_ref.stop(block=True)
self.assertFalse(killpg_mock.called)
diff --git a/services/nodemanager/tests/testutil.py b/services/nodemanager/tests/testutil.py
index 6cde766..5803b05 100644
--- a/services/nodemanager/tests/testutil.py
+++ b/services/nodemanager/tests/testutil.py
@@ -85,7 +85,10 @@ class MockTimer(object):
to_deliver = self.messages
self.messages = []
for callback, args, kwargs in to_deliver:
- callback(*args, **kwargs)
+ try:
+ callback(*args, **kwargs)
+ except pykka.ActorDeadError:
+ pass
def schedule(self, want_time, callback, *args, **kwargs):
with self.lock:
commit 8cb07346b3b7467cd74b187605ad665fae98a520
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Thu Mar 3 13:54:30 2016 -0500
8543: Convert most pykka proxies to TellActorProxy
diff --git a/services/nodemanager/arvnodeman/clientactor.py b/services/nodemanager/arvnodeman/clientactor.py
index 9a9ce58..66e94af 100644
--- a/services/nodemanager/arvnodeman/clientactor.py
+++ b/services/nodemanager/arvnodeman/clientactor.py
@@ -8,6 +8,7 @@ import time
import pykka
from .config import actor_class
+from .tellactorproxy import TellActorProxy
def _notify_subscribers(response, subscribers):
"""Send the response to all the subscriber methods.
@@ -38,7 +39,7 @@ class RemotePollLoopActor(actor_class):
super(RemotePollLoopActor, self).__init__()
self._client = client
self._timer = timer_actor
- self._later = self.actor_ref.proxy()
+ self._later = TellActorProxy(self.actor_ref)
self._polling_started = False
self.min_poll_wait = poll_wait
self.max_poll_wait = max_poll_wait
diff --git a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
index 2ae4fc8..028ad6f 100644
--- a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
+++ b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
@@ -13,6 +13,7 @@ from .. import \
arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh, \
arvados_node_missing, RetryMixin
from ...clientactor import _notify_subscribers
+from ...tellactorproxy import TellActorProxy
from ... import config
class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
@@ -26,7 +27,7 @@ class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
super(ComputeNodeStateChangeBase, self).__init__()
RetryMixin.__init__(self, retry_wait, max_retry_wait,
None, cloud_client, timer_actor)
- self._later = self.actor_ref.proxy()
+ self._later = TellActorProxy(self.actor_ref)
self._arvados = arvados_client
self.subscribers = set()
@@ -304,7 +305,7 @@ class ComputeNodeMonitorActor(config.actor_class):
boot_fail_after=1800
):
super(ComputeNodeMonitorActor, self).__init__()
- self._later = self.actor_ref.proxy()
+ self._later = TellActorProxy(self.actor_ref)
self._last_log = None
self._shutdowns = shutdown_timer
self._cloud_node_fqdn = cloud_fqdn_func
diff --git a/services/nodemanager/arvnodeman/daemon.py b/services/nodemanager/arvnodeman/daemon.py
index 0993c47..d3115e1 100644
--- a/services/nodemanager/arvnodeman/daemon.py
+++ b/services/nodemanager/arvnodeman/daemon.py
@@ -11,11 +11,12 @@ import pykka
from . import computenode as cnode
from .computenode import dispatch
from .config import actor_class
+from .tellactorproxy TellActorProxy
class _ComputeNodeRecord(object):
def __init__(self, actor=None, cloud_node=None, arvados_node=None,
assignment_time=float('-inf')):
- self.actor = actor
+ self.actor = actor.proxy()
self.cloud_node = cloud_node
self.arvados_node = arvados_node
self.assignment_time = assignment_time
@@ -121,7 +122,7 @@ class NodeManagerDaemonActor(actor_class):
self._new_arvados = arvados_factory
self._new_cloud = cloud_factory
self._cloud_driver = self._new_cloud()
- self._later = self.actor_ref.proxy()
+ self._later = TellActorProxy(self.actor_ref)
self.shutdown_windows = shutdown_windows
self.server_calculator = server_calculator
self.min_cloud_size = self.server_calculator.cheapest_size()
@@ -174,10 +175,11 @@ class NodeManagerDaemonActor(actor_class):
poll_stale_after=self.poll_stale_after,
node_stale_after=self.node_stale_after,
cloud_client=self._cloud_driver,
- boot_fail_after=self.boot_fail_after).proxy()
- actor.subscribe(self._later.node_can_shutdown)
+ boot_fail_after=self.boot_fail_after)
+ actorTell = TellActorProxy(actor)
+ actorTell.subscribe(self._later.node_can_shutdown)
self._cloud_nodes_actor.subscribe_to(cloud_node.id,
- actor.update_cloud_node)
+ actorTell.update_cloud_node)
record = _ComputeNodeRecord(actor, cloud_node)
return record
@@ -197,7 +199,7 @@ class NodeManagerDaemonActor(actor_class):
for key, record in self.cloud_nodes.orphans.iteritems():
if key in self.shutdowns:
try:
- self.shutdowns[key].stop().get()
+ self.shutdowns[key].stop()
except pykka.ActorDeadError:
pass
del self.shutdowns[key]
@@ -355,12 +357,12 @@ class NodeManagerDaemonActor(actor_class):
arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
self._logger.info("Want %i more %s nodes. Booting a node.",
nodes_wanted, cloud_size.name)
- new_setup = self._node_setup.start(
+ new_setup = TellActorProxy(self._node_setup.start(
timer_actor=self._timer,
arvados_client=self._new_arvados(),
arvados_node=arvados_node,
cloud_client=self._new_cloud(),
- cloud_size=cloud_size).proxy()
+ cloud_size=cloud_size))
self.booting[new_setup.actor_ref.actor_urn] = new_setup
self.sizes_booting_shutdown[new_setup.actor_ref.actor_urn] = cloud_size
@@ -407,10 +409,10 @@ class NodeManagerDaemonActor(actor_class):
cloud_node_id = cloud_node_obj.id
if cloud_node_id in self.shutdowns:
return None
- shutdown = self._node_shutdown.start(
+ shutdown = TellActorProxy(self._node_shutdown.start(
timer_actor=self._timer, cloud_client=self._new_cloud(),
arvados_client=self._new_arvados(),
- node_monitor=node_actor.actor_ref, cancellable=cancellable).proxy()
+ node_monitor=node_actor.actor_ref, cancellable=cancellable))
self.shutdowns[cloud_node_id] = shutdown
self.sizes_booting_shutdown[cloud_node_id] = cloud_node_obj.size
shutdown.subscribe(self._later.node_finished_shutdown)
diff --git a/services/nodemanager/arvnodeman/launcher.py b/services/nodemanager/arvnodeman/launcher.py
index c8b3d19..3586c4b 100644
--- a/services/nodemanager/arvnodeman/launcher.py
+++ b/services/nodemanager/arvnodeman/launcher.py
@@ -16,6 +16,7 @@ from .daemon import NodeManagerDaemonActor
from .jobqueue import JobQueueMonitorActor, ServerCalculator
from .nodelist import ArvadosNodeListMonitorActor, CloudNodeListMonitorActor
from .timedcallback import TimedCallBackActor
+from .tellactorproxy import TellActorProxy
node_daemon = None
@@ -69,14 +70,14 @@ def launch_pollers(config, server_calculator):
poll_time = config.getint('Daemon', 'poll_time')
max_poll_time = config.getint('Daemon', 'max_poll_time')
- timer = TimedCallBackActor.start(poll_time / 10.0).proxy()
- cloud_node_poller = CloudNodeListMonitorActor.start(
- config.new_cloud_client(), timer, poll_time, max_poll_time).proxy()
- arvados_node_poller = ArvadosNodeListMonitorActor.start(
- config.new_arvados_client(), timer, poll_time, max_poll_time).proxy()
- job_queue_poller = JobQueueMonitorActor.start(
+ timer = TellActorProxy(TimedCallBackActor.start(poll_time / 10.0))
+ cloud_node_poller = TellActorProxy(CloudNodeListMonitorActor.start(
+ config.new_cloud_client(), timer, poll_time, max_poll_time))
+ arvados_node_poller = TellActorProxy(ArvadosNodeListMonitorActor.start(
+ config.new_arvados_client(), timer, poll_time, max_poll_time))
+ job_queue_poller = TellActorProxy(JobQueueMonitorActor.start(
config.new_arvados_client(), timer, server_calculator,
- poll_time, max_poll_time).proxy()
+ poll_time, max_poll_time))
return timer, cloud_node_poller, arvados_node_poller, job_queue_poller
_caught_signals = {}
@@ -110,7 +111,7 @@ def main(args=None):
server_calculator = build_server_calculator(config)
timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
launch_pollers(config, server_calculator)
- cloud_node_updater = node_update.start(config.new_cloud_client).proxy()
+ cloud_node_updater = TellActorProxy(node_update.start(config.new_cloud_client))
node_daemon = NodeManagerDaemonActor.start(
job_queue_poller, arvados_node_poller, cloud_node_poller,
cloud_node_updater, timer,
diff --git a/services/nodemanager/arvnodeman/timedcallback.py b/services/nodemanager/arvnodeman/timedcallback.py
index 615f798..a89e437 100644
--- a/services/nodemanager/arvnodeman/timedcallback.py
+++ b/services/nodemanager/arvnodeman/timedcallback.py
@@ -8,6 +8,7 @@ import time
import pykka
from .config import actor_class
+from .tellactorproxy import TellActorProxy
class TimedCallBackActor(actor_class):
"""Send messages to other actors on a schedule.
@@ -18,7 +19,7 @@ class TimedCallBackActor(actor_class):
"""
def __init__(self, max_sleep=1):
super(TimedCallBackActor, self).__init__()
- self._proxy = self.actor_ref.proxy()
+ self._proxy = TellActorProxy(self.actor_ref)
self.messages = []
self.max_sleep = max_sleep
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list