[ARVADOS] updated: a7a16338702965de3ad0687470ef5beb2f42759b

Git user git at public.curoverse.com
Mon May 16 10:29:55 EDT 2016


Summary of changes:
 .../arvnodeman/computenode/dispatch/__init__.py    |  47 ++++---
 services/nodemanager/arvnodeman/daemon.py          | 145 +++++++++------------
 .../nodemanager/tests/test_computenode_dispatch.py |  17 ++-
 services/nodemanager/tests/test_daemon.py          |  10 +-
 4 files changed, 101 insertions(+), 118 deletions(-)

       via  a7a16338702965de3ad0687470ef5beb2f42759b (commit)
      from  c581b66ad0b54d2e57e4c92da9b04af0dbe4ac67 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit a7a16338702965de3ad0687470ef5beb2f42759b
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon May 16 10:29:50 2016 -0400

    9161: Decisions to start and stop compute nodes are now based on an explicit
    set of states: booting, unpaired, idle, busy, down, shutdown.  Refactor to
    remove 'shutdowns' dict and fold into cloud_nodes.  Nodes_wanted uses same
    computation of node state as used for decision to shut down nodes.  Nodes for
    which the state is unclear are either idle (if in the boot grace period) or
    down (if older).

diff --git a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
index 8674f16..f9dbd20 100644
--- a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
+++ b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
@@ -339,17 +339,20 @@ class ComputeNodeMonitorActor(config.actor_class):
         self._last_log = msg
         self._logger.debug(msg, *args)
 
-    def in_state(self, *states):
-        # Return a boolean to say whether or not our Arvados node record is in
-        # one of the given states.  If state information is not
-        # available--because this node has no Arvados record, the record is
-        # stale, or the record has no state information--return None.
-        if (self.arvados_node is None) or not timestamp_fresh(
-              arvados_node_mtime(self.arvados_node), self.node_stale_after):
-            return None
+    def get_state(self):
+        """Get node state, one of ['unpaired', 'busy', 'idle', 'down']."""
+
+        # If this node is not associated with an Arvados node, return 'unpaired'.
+        if self.arvados_node is None:
+            return 'unpaired'
+
         state = self.arvados_node['crunch_worker_state']
-        if not state:
-            return None
+
+        # If state information is not available because it is missing or the
+        # record is stale, return 'down'.
+        if not state or not timestamp_fresh(arvados_node_mtime(self.arvados_node),
+                                            self.node_stale_after):
+            state = 'down'
 
         # There's a window between when a node pings for the first time and the
         # value of 'slurm_state' is synchronized by crunch-dispatch.  In this
@@ -368,11 +371,13 @@ class ComputeNodeMonitorActor(config.actor_class):
         if arvados_node_missing(self.arvados_node, self.node_stale_after):
             state = 'down'
 
-        result = state in states
-        if state == 'idle':
-            result = result and not self.arvados_node['job_uuid']
+        if state == 'idle' and self.arvados_node['job_uuid']:
+            state = 'busy'
+
+        return state
 
-        return result
+    def in_state(self, *states):
+        return self.get_state() in states
 
     def shutdown_eligible(self):
         """Determine if node is candidate for shut down.
@@ -389,18 +394,10 @@ class ComputeNodeMonitorActor(config.actor_class):
         # boot_grace = ["boot wait", "boot exceeded"]
         # idle_grace = ["not idle", "idle wait", "idle exceeded"]
 
-        if self.arvados_node is None:
-            crunch_worker_state = 'unpaired'
-        elif not timestamp_fresh(arvados_node_mtime(self.arvados_node), self.node_stale_after):
+        if self.arvados_node and not timestamp_fresh(arvados_node_mtime(self.arvados_node), self.node_stale_after):
             return (False, "node state is stale")
-        elif self.in_state('down'):
-            crunch_worker_state = 'down'
-        elif self.in_state('idle'):
-            crunch_worker_state = 'idle'
-        elif self.in_state('busy'):
-            crunch_worker_state = 'busy'
-        else:
-            return (False, "node is paired but crunch_worker_state is '%s'" % self.arvados_node['crunch_worker_state'])
+
+        crunch_worker_state = self.get_state()
 
         window = "open" if self._shutdowns.window_open() else "closed"
 
diff --git a/services/nodemanager/arvnodeman/daemon.py b/services/nodemanager/arvnodeman/daemon.py
index 3ad2d43..589b9a1 100644
--- a/services/nodemanager/arvnodeman/daemon.py
+++ b/services/nodemanager/arvnodeman/daemon.py
@@ -19,7 +19,7 @@ class _ComputeNodeRecord(object):
         self.cloud_node = cloud_node
         self.arvados_node = arvados_node
         self.assignment_time = assignment_time
-
+        self.shutdown_actor = None
 
 class _BaseNodeTracker(object):
     def __init__(self):
@@ -143,9 +143,8 @@ class NodeManagerDaemonActor(actor_class):
             self.last_polls[poll_name] = -self.poll_stale_after
         self.cloud_nodes = _CloudNodeTracker()
         self.arvados_nodes = _ArvadosNodeTracker()
-        self.booting = {}       # Arvados node UUID to ComputeNodeSetupActors
-        self.shutdowns = {}     # Cloud node IDs to ComputeNodeShutdownActors
-        self.sizes_booting_shutdown = {} # Actor IDs or Cloud node IDs to node size
+        self.booting = {}       # Actor IDs to ComputeNodeSetupActors
+        self.sizes_booting = {} # Actor IDs to node size
 
     def on_start(self):
         self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
@@ -201,15 +200,13 @@ class NodeManagerDaemonActor(actor_class):
 
         self.try_pairing()
 
-        for key, record in self.cloud_nodes.orphans.iteritems():
-            shutdown = key in self.shutdowns
-            if shutdown:
+        for record in self.cloud_nodes.orphans.itervalues():
+            if record.shutdown_actor:
                 try:
-                    self.shutdowns[key].stop().get()
+                    record.shutdown_actor.stop()
                 except pykka.ActorDeadError:
                     pass
-                del self.shutdowns[key]
-                del self.sizes_booting_shutdown[key]
+                record.shutdown_actor = None
 
             # A recently booted node is a node that successfully completed the
             # setup actor but has not yet appeared in the cloud node list.
@@ -245,86 +242,73 @@ class NodeManagerDaemonActor(actor_class):
     def _nodes_booting(self, size):
         s = sum(1
                 for c in self.booting.iterkeys()
-                if size is None or self.sizes_booting_shutdown[c].id == size.id)
+                if size is None or self.sizes_booting[c].id == size.id)
         return s
 
-    def _nodes_unpaired(self, size):
-        return sum(1
-                   for c in self.cloud_nodes.unpaired()
-                   if size is None or c.cloud_node.size.id == size.id)
-
-    def _nodes_down(self, size):
-        # Make sure to iterate over self.cloud_nodes because what we're
-        # counting here are compute nodes that are reported by the cloud
-        # provider but are considered "down" by Arvados.
-        return sum(1 for down in
-                   pykka.get_all(rec.actor.in_state('down') for rec in
-                                 self.cloud_nodes.paired()
-                                 if ((size is None or rec.cloud_node.size.id == size.id) and
-                                     rec.cloud_node.id not in self.shutdowns))
-                   if down)
-
-    def _nodes_size(self, size):
-        return sum(1
-                  for c in self.cloud_nodes.nodes.itervalues()
-                  if size is None or c.cloud_node.size.id == size.id)
-
-    def _nodes_up(self, size):
-        up = (self._nodes_booting(size) + self._nodes_size(size)) - (self._nodes_down(size) + self._size_shutdowns(size))
+    def _node_states(self, size):
+        states = pykka.get_all(rec.actor.get_state()
+                               for rec in self.cloud_nodes.nodes.itervalues()
+                               if ((size is None or rec.cloud_node.size.id == size.id) and
+                                   rec.shutdown_actor is None))
+        states += ['shutdown' for rec in self.cloud_nodes.nodes.itervalues()
+                   if ((size is None or rec.cloud_node.size.id == size.id) and
+                       rec.shutdown_actor is not None)]
+        return states
+
+    def _state_counts(self, size):
+        states = self._node_states(size)
+        counts = {
+            "booting": self._nodes_booting(size),
+            "unpaired": 0,
+            "busy": 0,
+            "idle": 0,
+            "down": 0,
+            "shutdown": 0
+        }
+        for s in states:
+            counts[s] = counts[s] + 1
+        return counts
+
+    def _nodes_up(self, counts):
+        up = counts["booting"] + counts["unpaired"] + counts["idle"] + counts["busy"]
         return up
 
     def _total_price(self):
         cost = 0
-        cost += sum(self.server_calculator.find_size(self.sizes_booting_shutdown[c].id).price
+        cost += sum(self.server_calculator.find_size(self.sizes_booting[c].id).price
                   for c in self.booting.iterkeys())
         cost += sum(self.server_calculator.find_size(c.cloud_node.size.id).price
                     for c in self.cloud_nodes.nodes.itervalues())
         return cost
 
-    def _nodes_busy(self, size):
-        return sum(1 for busy in
-                   pykka.get_all(rec.actor.in_state('busy') for rec in
-                                 self.cloud_nodes.nodes.itervalues()
-                                 if rec.cloud_node.size.id == size.id)
-                   if busy)
-
     def _size_wishlist(self, size):
         return sum(1 for c in self.last_wishlist if c.id == size.id)
 
-    def _size_shutdowns(self, size):
-        return sum(1
-                  for c in self.shutdowns.iterkeys()
-                  if size is None or self.sizes_booting_shutdown[c].id == size.id)
-
     def _nodes_wanted(self, size):
-        total_up_count = self._nodes_booting(None) + self._nodes_size(None)
-        under_min = self.min_nodes - total_up_count
-        over_max = total_up_count - self.max_nodes
+        total_node_count = self._nodes_booting(None) + len(self.cloud_nodes)
+        under_min = self.min_nodes - total_node_count
+        over_max = total_node_count - self.max_nodes
         total_price = self._total_price()
 
-        if over_max >= 0:
-            return -over_max
-        elif under_min > 0 and size.id == self.min_cloud_size.id:
-            return under_min
+        counts = self._state_counts(size)
 
-        up_count = self._nodes_up(size)
-        booting_count = self._nodes_booting(size)
-        total_count = self._nodes_size(size)
-        unpaired_count = self._nodes_unpaired(size)
-        busy_count = self._nodes_busy(size)
-        down_count = self._nodes_down(size)
-        shutdown_count = self._size_shutdowns(size)
-        idle_count = total_count - (unpaired_count+busy_count+down_count+shutdown_count)
+        up_count = self._nodes_up(counts)
+        busy_count = counts["busy"]
 
         self._logger.info("%s: wishlist %i, up %i (booting %i, unpaired %i, idle %i, busy %i), down %i, shutdown %i", size.name,
                           self._size_wishlist(size),
                           up_count,
-                          booting_count,
-                          unpaired_count,
-                          idle_count,
+                          counts["booting"],
+                          counts["unpaired"],
+                          counts["idle"],
                           busy_count,
-                          down_count,
-                          shutdown_count)
+                          counts["down"],
+                          counts["shutdown"])
+
+        if over_max >= 0:
+            return -over_max
+        elif under_min > 0 and size.id == self.min_cloud_size.id:
+            return under_min
 
         wanted = self._size_wishlist(size) - (up_count - busy_count)
         if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price):
@@ -337,10 +321,11 @@ class NodeManagerDaemonActor(actor_class):
             return wanted
 
     def _nodes_excess(self, size):
-        up_count = self._nodes_up(size)
+        counts = self._state_counts(size)
+        up_count = self._nodes_up(counts)
         if size.id == self.min_cloud_size.id:
             up_count -= self.min_nodes
-        return up_count - (self._nodes_busy(size) + self._size_wishlist(size))
+        return up_count - (counts["busy"] + self._size_wishlist(size))
 
     def update_server_wishlist(self, wishlist):
         self._update_poll_time('server_wishlist')
@@ -387,7 +372,7 @@ class NodeManagerDaemonActor(actor_class):
             cloud_client=self._new_cloud(),
             cloud_size=cloud_size).proxy()
         self.booting[new_setup.actor_ref.actor_urn] = new_setup
-        self.sizes_booting_shutdown[new_setup.actor_ref.actor_urn] = cloud_size
+        self.sizes_booting[new_setup.actor_ref.actor_urn] = cloud_size
 
         if arvados_node is not None:
             self.arvados_nodes[arvados_node['uuid']].assignment_time = (
@@ -412,7 +397,7 @@ class NodeManagerDaemonActor(actor_class):
             cloud_node._nodemanager_recently_booted = True
             self._register_cloud_node(cloud_node)
         del self.booting[setup_proxy.actor_ref.actor_urn]
-        del self.sizes_booting_shutdown[setup_proxy.actor_ref.actor_urn]
+        del self.sizes_booting[setup_proxy.actor_ref.actor_urn]
 
     @_check_poll_freshness
     def stop_booting_node(self, size):
@@ -422,7 +407,7 @@ class NodeManagerDaemonActor(actor_class):
         for key, node in self.booting.iteritems():
             if node and node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get():
                 del self.booting[key]
-                del self.sizes_booting_shutdown[key]
+                del self.sizes_booting[key]
 
                 if nodes_excess > 1:
                     self._later.stop_booting_node(size)
@@ -431,14 +416,14 @@ class NodeManagerDaemonActor(actor_class):
     def _begin_node_shutdown(self, node_actor, cancellable):
         cloud_node_obj = node_actor.cloud_node.get()
         cloud_node_id = cloud_node_obj.id
-        if cloud_node_id in self.shutdowns:
+        record = self.cloud_nodes[cloud_node_id]
+        if record.shutdown_actor is not None:
             return None
         shutdown = self._node_shutdown.start(
             timer_actor=self._timer, cloud_client=self._new_cloud(),
             arvados_client=self._new_arvados(),
             node_monitor=node_actor.actor_ref, cancellable=cancellable)
-        self.shutdowns[cloud_node_id] = shutdown.proxy()
-        self.sizes_booting_shutdown[cloud_node_id] = cloud_node_obj.size
+        record.shutdown_actor = shutdown.proxy()
         shutdown.tell_proxy().subscribe(self._later.node_finished_shutdown)
 
     @_check_poll_freshness
@@ -460,13 +445,12 @@ class NodeManagerDaemonActor(actor_class):
         cloud_node, success, cancel_reason = self._get_actor_attrs(
             shutdown_actor, 'cloud_node', 'success', 'cancel_reason')
         cloud_node_id = cloud_node.id
+        record = self.cloud_nodes[cloud_node_id]
         shutdown_actor.stop()
-
         if not success:
             if cancel_reason == self._node_shutdown.NODE_BROKEN:
                 self.cloud_nodes.blacklist(cloud_node_id)
-            del self.shutdowns[cloud_node_id]
-            del self.sizes_booting_shutdown[cloud_node_id]
+            record.shutdown_actor = None
         else:
             # If the node went from being booted to being shut down without ever
             # appearing in the cloud node list, it will have the
@@ -475,11 +459,6 @@ class NodeManagerDaemonActor(actor_class):
             if hasattr(self.cloud_nodes[cloud_node_id].cloud_node, "_nodemanager_recently_booted"):
                 del self.cloud_nodes[cloud_node_id].cloud_node._nodemanager_recently_booted
 
-        # On success, we want to leave the entry in self.shutdowns so that it
-        # won't try to shut down the node again.  It should disappear from the
-        # cloud node list, and the entry in self.shutdowns will get cleaned up
-        # by update_cloud_nodes.
-
     def shutdown(self):
         self._logger.info("Shutting down after signal.")
         self.poll_stale_after = -1  # Inhibit starting/stopping nodes
diff --git a/services/nodemanager/tests/test_computenode_dispatch.py b/services/nodemanager/tests/test_computenode_dispatch.py
index bf86e69..227b5e5 100644
--- a/services/nodemanager/tests/test_computenode_dispatch.py
+++ b/services/nodemanager/tests/test_computenode_dispatch.py
@@ -300,17 +300,24 @@ class ComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
 
     def test_in_state_when_unpaired(self):
         self.make_actor()
-        self.assertIsNone(self.node_state('idle', 'busy'))
+        self.assertTrue(self.node_state('unpaired'))
 
     def test_in_state_when_pairing_stale(self):
         self.make_actor(arv_node=testutil.arvados_node_mock(
                 job_uuid=None, age=90000))
-        self.assertIsNone(self.node_state('idle', 'busy'))
+        self.assertTrue(self.node_state('down'))
 
     def test_in_state_when_no_state_available(self):
         self.make_actor(arv_node=testutil.arvados_node_mock(
                 crunch_worker_state=None))
-        self.assertIsNone(self.node_state('idle', 'busy'))
+        print(self.node_actor.get_state().get())
+        self.assertTrue(self.node_state('idle'))
+
+    def test_in_state_when_no_state_available_old(self):
+        self.make_actor(arv_node=testutil.arvados_node_mock(
+                crunch_worker_state=None, age=90000))
+        print(self.node_actor.get_state().get())
+        self.assertTrue(self.node_state('down'))
 
     def test_in_idle_state(self):
         self.make_actor(2, arv_node=testutil.arvados_node_mock(job_uuid=None))
@@ -397,11 +404,11 @@ class ComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
         self.assertEquals((False, "node state is ('busy', 'open', 'boot wait', 'idle exceeded')"),
                           self.node_actor.shutdown_eligible().get(self.TIMEOUT))
 
-    def test_no_shutdown_when_node_state_unknown(self):
+    def test_shutdown_when_node_state_unknown(self):
         self.make_actor(5, testutil.arvados_node_mock(
             5, crunch_worker_state=None))
         self.shutdowns._set_state(True, 600)
-        self.assertEquals((False, "node is paired but crunch_worker_state is 'None'"),
+        self.assertEquals((True, "node state is ('idle', 'open', 'boot wait', 'idle exceeded')"),
                           self.node_actor.shutdown_eligible().get(self.TIMEOUT))
 
     def test_no_shutdown_when_node_state_stale(self):
diff --git a/services/nodemanager/tests/test_daemon.py b/services/nodemanager/tests/test_daemon.py
index b6557d0..fe7b0fe 100644
--- a/services/nodemanager/tests/test_daemon.py
+++ b/services/nodemanager/tests/test_daemon.py
@@ -215,8 +215,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         mock_node_monitor.proxy.return_value = mock.NonCallableMock(cloud_node=get_cloud_node)
         mock_shutdown = self.node_shutdown.start(node_monitor=mock_node_monitor)
 
-        self.daemon.shutdowns.get()[cloud_nodes[1].id] = mock_shutdown.proxy()
-        self.daemon.sizes_booting_shutdown.get()[cloud_nodes[1].id] = size
+        self.daemon.cloud_nodes.get()[cloud_nodes[1].id].shutdown_actor = mock_shutdown.proxy()
 
         self.assertEqual(2, self.alive_monitor_count())
         for mon_ref in self.monitor_list():
@@ -680,7 +679,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
             self.daemon.node_can_shutdown(c.actor)
 
         booting = self.daemon.booting.get()
-        shutdowns = self.daemon.shutdowns.get()
+        cloud_nodes = self.daemon.cloud_nodes.get()
 
         self.stop_proxy(self.daemon)
 
@@ -696,8 +695,9 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
 
         # shutting down a small node
         sizecounts = {a[0].id: 0 for a in avail_sizes}
-        for b in shutdowns.itervalues():
-            sizecounts[b.cloud_node.get().size.id] += 1
+        for b in cloud_nodes.nodes.itervalues():
+            if b.shutdown_actor is not None:
+                sizecounts[b.cloud_node.size.id] += 1
         self.assertEqual(1, sizecounts[small.id])
         self.assertEqual(0, sizecounts[big.id])
 

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list