[ARVADOS] updated: 1aabac3cbaf0d701696ffabceb992826abeea8c5

git at public.curoverse.com git at public.curoverse.com
Wed Oct 22 10:33:35 EDT 2014


Summary of changes:
 .../nodemanager/arvnodeman/computenode/__init__.py | 130 +++++++++++----------
 services/nodemanager/arvnodeman/daemon.py          |  23 +++-
 services/nodemanager/tests/test_computenode.py     |   8 ++
 services/nodemanager/tests/test_daemon.py          |  90 ++++++++++++--
 4 files changed, 177 insertions(+), 74 deletions(-)

       via  1aabac3cbaf0d701696ffabceb992826abeea8c5 (commit)
       via  8d2639525417aaa02240777454405f2249d505b0 (commit)
       via  ca1ddb913b35eaa4aea43a88501c5b7c6ab95e2c (commit)
      from  06686c47a0f99f5b5f292cde2d391024bf107514 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 1aabac3cbaf0d701696ffabceb992826abeea8c5
Merge: 06686c4 8d26395
Author: Brett Smith <brett at curoverse.com>
Date:   Wed Oct 22 10:35:21 2014 -0400

    Merge branch '4139-node-manager-race-fix-wip'
    
    Refs #4139, #4275.


commit 8d2639525417aaa02240777454405f2249d505b0
Author: Brett Smith <brett at curoverse.com>
Date:   Wed Oct 22 10:34:23 2014 -0400

    4139: Node Manager more closely tracks nodes it boots.
    
    When using Node Manager on EC2, we saw a race condition where a node
    would finish setting up, but would not be returned in node listings
    right away.  This adds a "booted" node state to the daemon, where a
    node booted by Node Manager is assumed to be up and running unless it
    disappears from the listing *after* being listed.  It's escalated to
    the normal workflow after it appears in a listing.

diff --git a/services/nodemanager/arvnodeman/daemon.py b/services/nodemanager/arvnodeman/daemon.py
index 5b7437f..803e9ba 100644
--- a/services/nodemanager/arvnodeman/daemon.py
+++ b/services/nodemanager/arvnodeman/daemon.py
@@ -123,6 +123,7 @@ class NodeManagerDaemonActor(actor_class):
         self.cloud_nodes = _CloudNodeTracker()
         self.arvados_nodes = _ArvadosNodeTracker()
         self.booting = {}       # Actor IDs to ComputeNodeSetupActors
+        self.booted = {}        # Cloud node IDs to _ComputeNodeRecords
         self.shutdowns = {}     # Cloud node IDs to ComputeNodeShutdownActors
         self._logger.debug("Daemon initialized")
 
@@ -154,22 +155,24 @@ class NodeManagerDaemonActor(actor_class):
         self._cloud_nodes_actor.subscribe_to(cloud_node.id,
                                              actor.update_cloud_node)
         record = _ComputeNodeRecord(actor, cloud_node)
-        self.cloud_nodes.add(record)
         return record
 
     def update_cloud_nodes(self, nodelist):
         self._update_poll_time('cloud_nodes')
         for key, node in self.cloud_nodes.update_from(nodelist):
             self._logger.info("Registering new cloud node %s", key)
-            record = self._new_node(node)
+            if key in self.booting:
+                record = self.booting.pop(key)
+            else:
+                record = self._new_node(node)
+            self.cloud_nodes.add(record)
             for arv_rec in self.arvados_nodes.unpaired():
                 if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
                     self._pair_nodes(record, arv_rec.arvados_node)
                     break
         for key, record in self.cloud_nodes.orphans.iteritems():
             record.actor.stop()
-            if key in self.shutdowns:
-                self.shutdowns.pop(key).stop()
+            self.shutdowns.pop(key, None)
 
     def update_arvados_nodes(self, nodelist):
         self._update_poll_time('arvados_nodes')
@@ -185,7 +188,8 @@ class NodeManagerDaemonActor(actor_class):
                     break
 
     def _node_count(self):
-        up = sum(len(nodelist) for nodelist in [self.cloud_nodes, self.booting])
+        up = sum(len(nodelist) for nodelist in
+                 [self.cloud_nodes, self.booted, self.booting])
         return up - len(self.shutdowns)
 
     def _nodes_wanted(self):
@@ -253,6 +257,7 @@ class NodeManagerDaemonActor(actor_class):
         record = self.cloud_nodes.get(cloud_node.id)
         if record is None:
             record = self._new_node(cloud_node)
+            self.booted[cloud_node.id] = record
         self._pair_nodes(record, arvados_node)
 
     @_check_poll_freshness
@@ -279,6 +284,14 @@ class NodeManagerDaemonActor(actor_class):
                                              cloud_client=self._new_cloud(),
                                              cloud_node=cloud_node).proxy()
         self.shutdowns[cloud_node.id] = shutdown
+        shutdown.subscribe(self._later.node_finished_shutdown)
+
+    def node_finished_shutdown(self, shutdown_actor):
+        cloud_node_id = shutdown_actor.cloud_node.get().id
+        shutdown_actor.stop()
+        if cloud_node_id in self.booted:
+            self.booted.pop(cloud_node_id).actor.stop()
+            del self.shutdowns[cloud_node_id]
 
     def shutdown(self):
         self._logger.info("Shutting down after signal.")
diff --git a/services/nodemanager/tests/test_daemon.py b/services/nodemanager/tests/test_daemon.py
index 0a63222..a9ebf42 100644
--- a/services/nodemanager/tests/test_daemon.py
+++ b/services/nodemanager/tests/test_daemon.py
@@ -100,23 +100,93 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.stop_proxy(self.daemon)
         self.assertEqual(1, self.node_setup.start.call_count)
 
+    def mock_setup_actor(self, cloud_node, arv_node):
+        setup = mock.MagicMock(name='setup_node_mock')
+        setup.actor_ref = self.node_setup.start().proxy().actor_ref
+        self.node_setup.reset_mock()
+        setup.actor_urn = cloud_node.id
+        setup.cloud_node.get.return_value = cloud_node
+        setup.arvados_node.get.return_value = arv_node
+        return setup
+
+    def start_node_boot(self, cloud_node=None, arv_node=None, id_num=1):
+        if cloud_node is None:
+            cloud_node = testutil.cloud_node_mock(id_num)
+        if arv_node is None:
+            arv_node = testutil.arvados_node_mock(id_num)
+        self.make_daemon(want_sizes=[testutil.MockSize(id_num)])
+        self.daemon.max_nodes.get(self.TIMEOUT)
+        self.assertEqual(1, self.node_setup.start.call_count)
+        return self.mock_setup_actor(cloud_node, arv_node)
+
     def test_no_duplication_when_booting_node_listed_fast(self):
         # Test that we don't start two ComputeNodeMonitorActors when
         # we learn about a booting node through a listing before we
         # get the "node up" message from CloudNodeSetupActor.
         cloud_node = testutil.cloud_node_mock(1)
-        self.make_daemon(want_sizes=[testutil.MockSize(1)])
-        self.daemon.max_nodes.get(self.TIMEOUT)
-        self.assertEqual(1, self.node_setup.start.call_count)
-        setup = mock.MagicMock(name='setup_node_mock')
-        setup.actor_ref = self.node_setup.start().proxy().actor_ref
-        setup.cloud_node.get.return_value = cloud_node
-        setup.arvados_node.get.return_value = testutil.arvados_node_mock(1)
+        setup = self.start_node_boot(cloud_node)
         self.daemon.update_cloud_nodes([cloud_node]).get(self.TIMEOUT)
         self.assertTrue(self.node_factory.start.called)
         self.daemon.node_up(setup).get(self.TIMEOUT)
         self.assertEqual(1, self.node_factory.start.call_count)
 
+    def test_node_counted_after_boot_with_slow_listing(self):
+        # Test that, after we boot a compute node, we assume it exists
+        # even it doesn't appear in the listing (e.g., because of delays
+        # propagating tags).
+        setup = self.start_node_boot()
+        self.daemon.node_up(setup).get(self.TIMEOUT)
+        self.assertTrue(self.node_factory.start.called,
+                        "daemon not monitoring booted node")
+        self.daemon.update_cloud_nodes([])
+        self.stop_proxy(self.daemon)
+        self.assertEqual(1, self.node_factory.start.call_count,
+                         "daemon has duplicate monitors for booted node")
+        self.assertFalse(self.node_factory.start().proxy().stop.called,
+                         "daemon prematurely stopped monitoring a new node")
+
+    def test_booted_unlisted_node_counted(self):
+        setup = self.start_node_boot(id_num=1)
+        self.daemon.node_up(setup)
+        self.daemon.update_server_wishlist(
+            [testutil.MockSize(1)]).get(self.TIMEOUT)
+        self.stop_proxy(self.daemon)
+        self.assertFalse(self.node_setup.start.called,
+                         "daemon did not count booted node toward wishlist")
+
+    def test_booted_node_can_shutdown(self):
+        setup = self.start_node_boot()
+        self.daemon.node_up(setup)
+        self.daemon.update_server_wishlist([])
+        self.daemon.node_can_shutdown(
+            self.node_factory.start().proxy()).get(self.TIMEOUT)
+        self.stop_proxy(self.daemon)
+        self.assertTrue(self.node_shutdown.start.called,
+                        "daemon did not shut down booted node on offer")
+
+    def test_booted_node_lifecycle(self):
+        cloud_node = testutil.cloud_node_mock(6)
+        setup = self.start_node_boot(cloud_node, id_num=6)
+        self.daemon.node_up(setup)
+        self.daemon.update_server_wishlist([])
+        monitor = self.node_factory.start().proxy()
+        monitor.cloud_node.get.return_value = cloud_node
+        self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
+        self.assertTrue(self.node_shutdown.start.called,
+                        "daemon did not shut down booted node on offer")
+        shutdown = self.node_shutdown.start().proxy()
+        shutdown.cloud_node.get.return_value = cloud_node
+        self.daemon.node_finished_shutdown(shutdown).get(self.TIMEOUT)
+        self.assertTrue(shutdown.stop.called,
+                        "shutdown actor not stopped after finishing")
+        self.assertTrue(monitor.stop.called,
+                        "monitor for booted node not stopped after shutdown")
+        self.daemon.update_server_wishlist(
+            [testutil.MockSize(2)]).get(self.TIMEOUT)
+        self.stop_proxy(self.daemon)
+        self.assertTrue(self.node_setup.start.called,
+                        "second node not started after booted node stopped")
+
     def test_booting_nodes_shut_down(self):
         self.make_daemon(want_sizes=[testutil.MockSize(1)])
         self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
@@ -128,10 +198,10 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         cloud_node = testutil.cloud_node_mock(1)
         size = testutil.MockSize(1)
         self.make_daemon(cloud_nodes=[cloud_node], want_sizes=[size])
-        node_actor = self.node_factory().proxy()
-        self.daemon.node_can_shutdown(node_actor).get(self.TIMEOUT)
+        self.daemon.node_can_shutdown(
+            self.node_factory.start().proxy()).get(self.TIMEOUT)
         self.stop_proxy(self.daemon)
-        self.assertFalse(node_actor.shutdown.called)
+        self.assertFalse(self.node_shutdown.start.called)
 
     def test_shutdown_accepted_below_capacity(self):
         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])

commit ca1ddb913b35eaa4aea43a88501c5b7c6ab95e2c
Author: Brett Smith <brett at curoverse.com>
Date:   Wed Oct 22 10:02:33 2014 -0400

    4139: Node Manager supports subscribing to shutdown events.
    
    This is necessary to fix a race condition.  The daemon needs to be
    able to track nodes that it has booted even if they don't appear in
    listings.

diff --git a/services/nodemanager/arvnodeman/computenode/__init__.py b/services/nodemanager/arvnodeman/computenode/__init__.py
index ae25428..0d4ee7b 100644
--- a/services/nodemanager/arvnodeman/computenode/__init__.py
+++ b/services/nodemanager/arvnodeman/computenode/__init__.py
@@ -23,33 +23,6 @@ def arvados_node_mtime(node):
 def timestamp_fresh(timestamp, fresh_time):
     return (time.time() - timestamp) < fresh_time
 
-def _retry(errors):
-    """Retry decorator for an actor method that makes remote requests.
-
-    Use this function to decorator an actor method, and pass in a tuple of
-    exceptions to catch.  This decorator will schedule retries of that method
-    with exponential backoff if the original method raises any of the given
-    errors.
-    """
-    def decorator(orig_func):
-        @functools.wraps(orig_func)
-        def wrapper(self, *args, **kwargs):
-            try:
-                orig_func(self, *args, **kwargs)
-            except errors as error:
-                self._logger.warning(
-                    "Client error: %s - waiting %s seconds",
-                    error, self.retry_wait)
-                self._timer.schedule(self.retry_wait,
-                                     getattr(self._later, orig_func.__name__),
-                                     *args, **kwargs)
-                self.retry_wait = min(self.retry_wait * 2,
-                                      self.max_retry_wait)
-            else:
-                self.retry_wait = self.min_retry_wait
-        return wrapper
-    return decorator
-
 class BaseComputeNodeDriver(object):
     """Abstract base class for compute node drivers.
 
@@ -114,7 +87,66 @@ class BaseComputeNodeDriver(object):
 
 ComputeNodeDriverClass = BaseComputeNodeDriver
 
-class ComputeNodeSetupActor(config.actor_class):
+class ComputeNodeStateChangeBase(config.actor_class):
+    """Base class for actors that change a compute node's state.
+
+    This base class takes care of retrying changes and notifying
+    subscribers when the change is finished.
+    """
+    def __init__(self, logger_name, timer_actor, retry_wait, max_retry_wait):
+        super(ComputeNodeStateChangeBase, self).__init__()
+        self._later = self.actor_ref.proxy()
+        self._timer = timer_actor
+        self._logger = logging.getLogger(logger_name)
+        self.min_retry_wait = retry_wait
+        self.max_retry_wait = max_retry_wait
+        self.retry_wait = retry_wait
+        self.subscribers = set()
+
+    @staticmethod
+    def _retry(errors):
+        """Retry decorator for an actor method that makes remote requests.
+
+        Use this function to decorator an actor method, and pass in a
+        tuple of exceptions to catch.  This decorator will schedule
+        retries of that method with exponential backoff if the
+        original method raises any of the given errors.
+        """
+        def decorator(orig_func):
+            @functools.wraps(orig_func)
+            def wrapper(self, *args, **kwargs):
+                try:
+                    orig_func(self, *args, **kwargs)
+                except errors as error:
+                    self._logger.warning(
+                        "Client error: %s - waiting %s seconds",
+                        error, self.retry_wait)
+                    self._timer.schedule(self.retry_wait,
+                                         getattr(self._later,
+                                                 orig_func.__name__),
+                                         *args, **kwargs)
+                    self.retry_wait = min(self.retry_wait * 2,
+                                          self.max_retry_wait)
+                else:
+                    self.retry_wait = self.min_retry_wait
+            return wrapper
+        return decorator
+
+    def _finished(self):
+        _notify_subscribers(self._later, self.subscribers)
+        self.subscribers = None
+
+    def subscribe(self, subscriber):
+        if self.subscribers is None:
+            try:
+                subscriber(self._later)
+            except pykka.ActorDeadError:
+                pass
+        else:
+            self.subscribers.add(subscriber)
+
+
+class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
     """Actor to create and set up a cloud compute node.
 
     This actor prepares an Arvados node record for a new compute node
@@ -126,17 +158,11 @@ class ComputeNodeSetupActor(config.actor_class):
     def __init__(self, timer_actor, arvados_client, cloud_client,
                  cloud_size, arvados_node=None,
                  retry_wait=1, max_retry_wait=180):
-        super(ComputeNodeSetupActor, self).__init__()
-        self._timer = timer_actor
+        super(ComputeNodeSetupActor, self).__init__(
+            'arvnodeman.nodeup', timer_actor, retry_wait, max_retry_wait)
         self._arvados = arvados_client
         self._cloud = cloud_client
-        self._later = self.actor_ref.proxy()
-        self._logger = logging.getLogger('arvnodeman.nodeup')
         self.cloud_size = cloud_size
-        self.subscribers = set()
-        self.min_retry_wait = retry_wait
-        self.max_retry_wait = max_retry_wait
-        self.retry_wait = retry_wait
         self.arvados_node = None
         self.cloud_node = None
         if arvados_node is None:
@@ -144,12 +170,12 @@ class ComputeNodeSetupActor(config.actor_class):
         else:
             self._later.prepare_arvados_node(arvados_node)
 
-    @_retry(config.ARVADOS_ERRORS)
+    @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
     def create_arvados_node(self):
         self.arvados_node = self._arvados.nodes().create(body={}).execute()
         self._later.create_cloud_node()
 
-    @_retry(config.ARVADOS_ERRORS)
+    @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
     def prepare_arvados_node(self, node):
         self.arvados_node = self._arvados.nodes().update(
             uuid=node['uuid'],
@@ -163,52 +189,38 @@ class ComputeNodeSetupActor(config.actor_class):
             ).execute()
         self._later.create_cloud_node()
 
-    @_retry(config.CLOUD_ERRORS)
+    @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
     def create_cloud_node(self):
         self._logger.info("Creating cloud node with size %s.",
                           self.cloud_size.name)
         self.cloud_node = self._cloud.create_node(self.cloud_size,
                                                   self.arvados_node)
         self._logger.info("Cloud node %s created.", self.cloud_node.id)
-        _notify_subscribers(self._later, self.subscribers)
-        self.subscribers = None
+        self._finished()
 
     def stop_if_no_cloud_node(self):
         if self.cloud_node is None:
             self.stop()
 
-    def subscribe(self, subscriber):
-        if self.subscribers is None:
-            try:
-                subscriber(self._later)
-            except pykka.ActorDeadError:
-                pass
-        else:
-            self.subscribers.add(subscriber)
-
 
-class ComputeNodeShutdownActor(config.actor_class):
+class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
     """Actor to shut down a compute node.
 
     This actor simply destroys a cloud node, retrying as needed.
     """
     def __init__(self, timer_actor, cloud_client, cloud_node,
                  retry_wait=1, max_retry_wait=180):
-        super(ComputeNodeShutdownActor, self).__init__()
-        self._timer = timer_actor
+        super(ComputeNodeShutdownActor, self).__init__(
+            'arvnodeman.nodedown', timer_actor, retry_wait, max_retry_wait)
         self._cloud = cloud_client
-        self._later = self.actor_ref.proxy()
-        self._logger = logging.getLogger('arvnodeman.nodedown')
         self.cloud_node = cloud_node
-        self.min_retry_wait = retry_wait
-        self.max_retry_wait = max_retry_wait
-        self.retry_wait = retry_wait
         self._later.shutdown_node()
 
-    @_retry(config.CLOUD_ERRORS)
+    @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
     def shutdown_node(self):
         self._cloud.destroy_node(self.cloud_node)
         self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
+        self._finished()
 
 
 class ComputeNodeUpdateActor(config.actor_class):
diff --git a/services/nodemanager/tests/test_computenode.py b/services/nodemanager/tests/test_computenode.py
index 477e20e..57a86fd 100644
--- a/services/nodemanager/tests/test_computenode.py
+++ b/services/nodemanager/tests/test_computenode.py
@@ -114,6 +114,14 @@ class ComputeNodeShutdownActorTestCase(testutil.ActorTestMixin,
         self.stop_proxy(self.shutdown_actor)
         self.assertTrue(self.cloud_client.destroy_node.called)
 
+    def test_late_subscribe(self):
+        self.make_actor()
+        subscriber = mock.Mock(name='subscriber_mock')
+        self.shutdown_actor.subscribe(subscriber).get(self.TIMEOUT)
+        self.stop_proxy(self.shutdown_actor)
+        self.assertEqual(self.shutdown_actor.actor_ref.actor_urn,
+                         subscriber.call_args[0][0].actor_ref.actor_urn)
+
 
 class ComputeNodeUpdateActorTestCase(testutil.ActorTestMixin,
                                      unittest.TestCase):

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list