[ARVADOS] updated: 1aabac3cbaf0d701696ffabceb992826abeea8c5
git at public.curoverse.com
git at public.curoverse.com
Wed Oct 22 10:33:35 EDT 2014
Summary of changes:
.../nodemanager/arvnodeman/computenode/__init__.py | 130 +++++++++++----------
services/nodemanager/arvnodeman/daemon.py | 23 +++-
services/nodemanager/tests/test_computenode.py | 8 ++
services/nodemanager/tests/test_daemon.py | 90 ++++++++++++--
4 files changed, 177 insertions(+), 74 deletions(-)
via 1aabac3cbaf0d701696ffabceb992826abeea8c5 (commit)
via 8d2639525417aaa02240777454405f2249d505b0 (commit)
via ca1ddb913b35eaa4aea43a88501c5b7c6ab95e2c (commit)
from 06686c47a0f99f5b5f292cde2d391024bf107514 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 1aabac3cbaf0d701696ffabceb992826abeea8c5
Merge: 06686c4 8d26395
Author: Brett Smith <brett at curoverse.com>
Date: Wed Oct 22 10:35:21 2014 -0400
Merge branch '4139-node-manager-race-fix-wip'
Refs #4139, #4275.
commit 8d2639525417aaa02240777454405f2249d505b0
Author: Brett Smith <brett at curoverse.com>
Date: Wed Oct 22 10:34:23 2014 -0400
4139: Node Manager more closely tracks nodes it boots.
When using Node Manager on EC2, we saw a race condition where a node
would finish setting up, but would not be returned in node listings
right away. This adds a "booted" node state to the daemon, where a
node booted by Node Manager is assumed to be up and running unless it
disappears from the listing *after* being listed. It's escalated to
the normal workflow after it appears in a listing.
diff --git a/services/nodemanager/arvnodeman/daemon.py b/services/nodemanager/arvnodeman/daemon.py
index 5b7437f..803e9ba 100644
--- a/services/nodemanager/arvnodeman/daemon.py
+++ b/services/nodemanager/arvnodeman/daemon.py
@@ -123,6 +123,7 @@ class NodeManagerDaemonActor(actor_class):
self.cloud_nodes = _CloudNodeTracker()
self.arvados_nodes = _ArvadosNodeTracker()
self.booting = {} # Actor IDs to ComputeNodeSetupActors
+ self.booted = {} # Cloud node IDs to _ComputeNodeRecords
self.shutdowns = {} # Cloud node IDs to ComputeNodeShutdownActors
self._logger.debug("Daemon initialized")
@@ -154,22 +155,24 @@ class NodeManagerDaemonActor(actor_class):
self._cloud_nodes_actor.subscribe_to(cloud_node.id,
actor.update_cloud_node)
record = _ComputeNodeRecord(actor, cloud_node)
- self.cloud_nodes.add(record)
return record
def update_cloud_nodes(self, nodelist):
self._update_poll_time('cloud_nodes')
for key, node in self.cloud_nodes.update_from(nodelist):
self._logger.info("Registering new cloud node %s", key)
- record = self._new_node(node)
+ if key in self.booting:
+ record = self.booting.pop(key)
+ else:
+ record = self._new_node(node)
+ self.cloud_nodes.add(record)
for arv_rec in self.arvados_nodes.unpaired():
if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
self._pair_nodes(record, arv_rec.arvados_node)
break
for key, record in self.cloud_nodes.orphans.iteritems():
record.actor.stop()
- if key in self.shutdowns:
- self.shutdowns.pop(key).stop()
+ self.shutdowns.pop(key, None)
def update_arvados_nodes(self, nodelist):
self._update_poll_time('arvados_nodes')
@@ -185,7 +188,8 @@ class NodeManagerDaemonActor(actor_class):
break
def _node_count(self):
- up = sum(len(nodelist) for nodelist in [self.cloud_nodes, self.booting])
+ up = sum(len(nodelist) for nodelist in
+ [self.cloud_nodes, self.booted, self.booting])
return up - len(self.shutdowns)
def _nodes_wanted(self):
@@ -253,6 +257,7 @@ class NodeManagerDaemonActor(actor_class):
record = self.cloud_nodes.get(cloud_node.id)
if record is None:
record = self._new_node(cloud_node)
+ self.booted[cloud_node.id] = record
self._pair_nodes(record, arvados_node)
@_check_poll_freshness
@@ -279,6 +284,14 @@ class NodeManagerDaemonActor(actor_class):
cloud_client=self._new_cloud(),
cloud_node=cloud_node).proxy()
self.shutdowns[cloud_node.id] = shutdown
+ shutdown.subscribe(self._later.node_finished_shutdown)
+
+ def node_finished_shutdown(self, shutdown_actor):
+ cloud_node_id = shutdown_actor.cloud_node.get().id
+ shutdown_actor.stop()
+ if cloud_node_id in self.booted:
+ self.booted.pop(cloud_node_id).actor.stop()
+ del self.shutdowns[cloud_node_id]
def shutdown(self):
self._logger.info("Shutting down after signal.")
diff --git a/services/nodemanager/tests/test_daemon.py b/services/nodemanager/tests/test_daemon.py
index 0a63222..a9ebf42 100644
--- a/services/nodemanager/tests/test_daemon.py
+++ b/services/nodemanager/tests/test_daemon.py
@@ -100,23 +100,93 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
self.stop_proxy(self.daemon)
self.assertEqual(1, self.node_setup.start.call_count)
+ def mock_setup_actor(self, cloud_node, arv_node):
+ setup = mock.MagicMock(name='setup_node_mock')
+ setup.actor_ref = self.node_setup.start().proxy().actor_ref
+ self.node_setup.reset_mock()
+ setup.actor_urn = cloud_node.id
+ setup.cloud_node.get.return_value = cloud_node
+ setup.arvados_node.get.return_value = arv_node
+ return setup
+
+ def start_node_boot(self, cloud_node=None, arv_node=None, id_num=1):
+ if cloud_node is None:
+ cloud_node = testutil.cloud_node_mock(id_num)
+ if arv_node is None:
+ arv_node = testutil.arvados_node_mock(id_num)
+ self.make_daemon(want_sizes=[testutil.MockSize(id_num)])
+ self.daemon.max_nodes.get(self.TIMEOUT)
+ self.assertEqual(1, self.node_setup.start.call_count)
+ return self.mock_setup_actor(cloud_node, arv_node)
+
def test_no_duplication_when_booting_node_listed_fast(self):
# Test that we don't start two ComputeNodeMonitorActors when
# we learn about a booting node through a listing before we
# get the "node up" message from CloudNodeSetupActor.
cloud_node = testutil.cloud_node_mock(1)
- self.make_daemon(want_sizes=[testutil.MockSize(1)])
- self.daemon.max_nodes.get(self.TIMEOUT)
- self.assertEqual(1, self.node_setup.start.call_count)
- setup = mock.MagicMock(name='setup_node_mock')
- setup.actor_ref = self.node_setup.start().proxy().actor_ref
- setup.cloud_node.get.return_value = cloud_node
- setup.arvados_node.get.return_value = testutil.arvados_node_mock(1)
+ setup = self.start_node_boot(cloud_node)
self.daemon.update_cloud_nodes([cloud_node]).get(self.TIMEOUT)
self.assertTrue(self.node_factory.start.called)
self.daemon.node_up(setup).get(self.TIMEOUT)
self.assertEqual(1, self.node_factory.start.call_count)
+ def test_node_counted_after_boot_with_slow_listing(self):
+ # Test that, after we boot a compute node, we assume it exists
+ # even it doesn't appear in the listing (e.g., because of delays
+ # propagating tags).
+ setup = self.start_node_boot()
+ self.daemon.node_up(setup).get(self.TIMEOUT)
+ self.assertTrue(self.node_factory.start.called,
+ "daemon not monitoring booted node")
+ self.daemon.update_cloud_nodes([])
+ self.stop_proxy(self.daemon)
+ self.assertEqual(1, self.node_factory.start.call_count,
+ "daemon has duplicate monitors for booted node")
+ self.assertFalse(self.node_factory.start().proxy().stop.called,
+ "daemon prematurely stopped monitoring a new node")
+
+ def test_booted_unlisted_node_counted(self):
+ setup = self.start_node_boot(id_num=1)
+ self.daemon.node_up(setup)
+ self.daemon.update_server_wishlist(
+ [testutil.MockSize(1)]).get(self.TIMEOUT)
+ self.stop_proxy(self.daemon)
+ self.assertFalse(self.node_setup.start.called,
+ "daemon did not count booted node toward wishlist")
+
+ def test_booted_node_can_shutdown(self):
+ setup = self.start_node_boot()
+ self.daemon.node_up(setup)
+ self.daemon.update_server_wishlist([])
+ self.daemon.node_can_shutdown(
+ self.node_factory.start().proxy()).get(self.TIMEOUT)
+ self.stop_proxy(self.daemon)
+ self.assertTrue(self.node_shutdown.start.called,
+ "daemon did not shut down booted node on offer")
+
+ def test_booted_node_lifecycle(self):
+ cloud_node = testutil.cloud_node_mock(6)
+ setup = self.start_node_boot(cloud_node, id_num=6)
+ self.daemon.node_up(setup)
+ self.daemon.update_server_wishlist([])
+ monitor = self.node_factory.start().proxy()
+ monitor.cloud_node.get.return_value = cloud_node
+ self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
+ self.assertTrue(self.node_shutdown.start.called,
+ "daemon did not shut down booted node on offer")
+ shutdown = self.node_shutdown.start().proxy()
+ shutdown.cloud_node.get.return_value = cloud_node
+ self.daemon.node_finished_shutdown(shutdown).get(self.TIMEOUT)
+ self.assertTrue(shutdown.stop.called,
+ "shutdown actor not stopped after finishing")
+ self.assertTrue(monitor.stop.called,
+ "monitor for booted node not stopped after shutdown")
+ self.daemon.update_server_wishlist(
+ [testutil.MockSize(2)]).get(self.TIMEOUT)
+ self.stop_proxy(self.daemon)
+ self.assertTrue(self.node_setup.start.called,
+ "second node not started after booted node stopped")
+
def test_booting_nodes_shut_down(self):
self.make_daemon(want_sizes=[testutil.MockSize(1)])
self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
@@ -128,10 +198,10 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
cloud_node = testutil.cloud_node_mock(1)
size = testutil.MockSize(1)
self.make_daemon(cloud_nodes=[cloud_node], want_sizes=[size])
- node_actor = self.node_factory().proxy()
- self.daemon.node_can_shutdown(node_actor).get(self.TIMEOUT)
+ self.daemon.node_can_shutdown(
+ self.node_factory.start().proxy()).get(self.TIMEOUT)
self.stop_proxy(self.daemon)
- self.assertFalse(node_actor.shutdown.called)
+ self.assertFalse(self.node_shutdown.start.called)
def test_shutdown_accepted_below_capacity(self):
self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
commit ca1ddb913b35eaa4aea43a88501c5b7c6ab95e2c
Author: Brett Smith <brett at curoverse.com>
Date: Wed Oct 22 10:02:33 2014 -0400
4139: Node Manager supports subscribing to shutdown events.
This is necessary to fix a race condition. The daemon needs to be
able to track nodes that it has booted even if they don't appear in
listings.
diff --git a/services/nodemanager/arvnodeman/computenode/__init__.py b/services/nodemanager/arvnodeman/computenode/__init__.py
index ae25428..0d4ee7b 100644
--- a/services/nodemanager/arvnodeman/computenode/__init__.py
+++ b/services/nodemanager/arvnodeman/computenode/__init__.py
@@ -23,33 +23,6 @@ def arvados_node_mtime(node):
def timestamp_fresh(timestamp, fresh_time):
return (time.time() - timestamp) < fresh_time
-def _retry(errors):
- """Retry decorator for an actor method that makes remote requests.
-
- Use this function to decorator an actor method, and pass in a tuple of
- exceptions to catch. This decorator will schedule retries of that method
- with exponential backoff if the original method raises any of the given
- errors.
- """
- def decorator(orig_func):
- @functools.wraps(orig_func)
- def wrapper(self, *args, **kwargs):
- try:
- orig_func(self, *args, **kwargs)
- except errors as error:
- self._logger.warning(
- "Client error: %s - waiting %s seconds",
- error, self.retry_wait)
- self._timer.schedule(self.retry_wait,
- getattr(self._later, orig_func.__name__),
- *args, **kwargs)
- self.retry_wait = min(self.retry_wait * 2,
- self.max_retry_wait)
- else:
- self.retry_wait = self.min_retry_wait
- return wrapper
- return decorator
-
class BaseComputeNodeDriver(object):
"""Abstract base class for compute node drivers.
@@ -114,7 +87,66 @@ class BaseComputeNodeDriver(object):
ComputeNodeDriverClass = BaseComputeNodeDriver
-class ComputeNodeSetupActor(config.actor_class):
+class ComputeNodeStateChangeBase(config.actor_class):
+ """Base class for actors that change a compute node's state.
+
+ This base class takes care of retrying changes and notifying
+ subscribers when the change is finished.
+ """
+ def __init__(self, logger_name, timer_actor, retry_wait, max_retry_wait):
+ super(ComputeNodeStateChangeBase, self).__init__()
+ self._later = self.actor_ref.proxy()
+ self._timer = timer_actor
+ self._logger = logging.getLogger(logger_name)
+ self.min_retry_wait = retry_wait
+ self.max_retry_wait = max_retry_wait
+ self.retry_wait = retry_wait
+ self.subscribers = set()
+
+ @staticmethod
+ def _retry(errors):
+ """Retry decorator for an actor method that makes remote requests.
+
+ Use this function to decorator an actor method, and pass in a
+ tuple of exceptions to catch. This decorator will schedule
+ retries of that method with exponential backoff if the
+ original method raises any of the given errors.
+ """
+ def decorator(orig_func):
+ @functools.wraps(orig_func)
+ def wrapper(self, *args, **kwargs):
+ try:
+ orig_func(self, *args, **kwargs)
+ except errors as error:
+ self._logger.warning(
+ "Client error: %s - waiting %s seconds",
+ error, self.retry_wait)
+ self._timer.schedule(self.retry_wait,
+ getattr(self._later,
+ orig_func.__name__),
+ *args, **kwargs)
+ self.retry_wait = min(self.retry_wait * 2,
+ self.max_retry_wait)
+ else:
+ self.retry_wait = self.min_retry_wait
+ return wrapper
+ return decorator
+
+ def _finished(self):
+ _notify_subscribers(self._later, self.subscribers)
+ self.subscribers = None
+
+ def subscribe(self, subscriber):
+ if self.subscribers is None:
+ try:
+ subscriber(self._later)
+ except pykka.ActorDeadError:
+ pass
+ else:
+ self.subscribers.add(subscriber)
+
+
+class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
"""Actor to create and set up a cloud compute node.
This actor prepares an Arvados node record for a new compute node
@@ -126,17 +158,11 @@ class ComputeNodeSetupActor(config.actor_class):
def __init__(self, timer_actor, arvados_client, cloud_client,
cloud_size, arvados_node=None,
retry_wait=1, max_retry_wait=180):
- super(ComputeNodeSetupActor, self).__init__()
- self._timer = timer_actor
+ super(ComputeNodeSetupActor, self).__init__(
+ 'arvnodeman.nodeup', timer_actor, retry_wait, max_retry_wait)
self._arvados = arvados_client
self._cloud = cloud_client
- self._later = self.actor_ref.proxy()
- self._logger = logging.getLogger('arvnodeman.nodeup')
self.cloud_size = cloud_size
- self.subscribers = set()
- self.min_retry_wait = retry_wait
- self.max_retry_wait = max_retry_wait
- self.retry_wait = retry_wait
self.arvados_node = None
self.cloud_node = None
if arvados_node is None:
@@ -144,12 +170,12 @@ class ComputeNodeSetupActor(config.actor_class):
else:
self._later.prepare_arvados_node(arvados_node)
- @_retry(config.ARVADOS_ERRORS)
+ @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
def create_arvados_node(self):
self.arvados_node = self._arvados.nodes().create(body={}).execute()
self._later.create_cloud_node()
- @_retry(config.ARVADOS_ERRORS)
+ @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
def prepare_arvados_node(self, node):
self.arvados_node = self._arvados.nodes().update(
uuid=node['uuid'],
@@ -163,52 +189,38 @@ class ComputeNodeSetupActor(config.actor_class):
).execute()
self._later.create_cloud_node()
- @_retry(config.CLOUD_ERRORS)
+ @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
def create_cloud_node(self):
self._logger.info("Creating cloud node with size %s.",
self.cloud_size.name)
self.cloud_node = self._cloud.create_node(self.cloud_size,
self.arvados_node)
self._logger.info("Cloud node %s created.", self.cloud_node.id)
- _notify_subscribers(self._later, self.subscribers)
- self.subscribers = None
+ self._finished()
def stop_if_no_cloud_node(self):
if self.cloud_node is None:
self.stop()
- def subscribe(self, subscriber):
- if self.subscribers is None:
- try:
- subscriber(self._later)
- except pykka.ActorDeadError:
- pass
- else:
- self.subscribers.add(subscriber)
-
-class ComputeNodeShutdownActor(config.actor_class):
+class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
"""Actor to shut down a compute node.
This actor simply destroys a cloud node, retrying as needed.
"""
def __init__(self, timer_actor, cloud_client, cloud_node,
retry_wait=1, max_retry_wait=180):
- super(ComputeNodeShutdownActor, self).__init__()
- self._timer = timer_actor
+ super(ComputeNodeShutdownActor, self).__init__(
+ 'arvnodeman.nodedown', timer_actor, retry_wait, max_retry_wait)
self._cloud = cloud_client
- self._later = self.actor_ref.proxy()
- self._logger = logging.getLogger('arvnodeman.nodedown')
self.cloud_node = cloud_node
- self.min_retry_wait = retry_wait
- self.max_retry_wait = max_retry_wait
- self.retry_wait = retry_wait
self._later.shutdown_node()
- @_retry(config.CLOUD_ERRORS)
+ @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
def shutdown_node(self):
self._cloud.destroy_node(self.cloud_node)
self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
+ self._finished()
class ComputeNodeUpdateActor(config.actor_class):
diff --git a/services/nodemanager/tests/test_computenode.py b/services/nodemanager/tests/test_computenode.py
index 477e20e..57a86fd 100644
--- a/services/nodemanager/tests/test_computenode.py
+++ b/services/nodemanager/tests/test_computenode.py
@@ -114,6 +114,14 @@ class ComputeNodeShutdownActorTestCase(testutil.ActorTestMixin,
self.stop_proxy(self.shutdown_actor)
self.assertTrue(self.cloud_client.destroy_node.called)
+ def test_late_subscribe(self):
+ self.make_actor()
+ subscriber = mock.Mock(name='subscriber_mock')
+ self.shutdown_actor.subscribe(subscriber).get(self.TIMEOUT)
+ self.stop_proxy(self.shutdown_actor)
+ self.assertEqual(self.shutdown_actor.actor_ref.actor_urn,
+ subscriber.call_args[0][0].actor_ref.actor_urn)
+
class ComputeNodeUpdateActorTestCase(testutil.ActorTestMixin,
unittest.TestCase):
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list