[ARVADOS] updated: a359a8e554dcf343198a04c8297be384fa824a2f

git at public.curoverse.com git at public.curoverse.com
Wed Oct 8 09:10:23 EDT 2014


Summary of changes:
 .../nodemanager/arvnodeman/computenode/__init__.py |  14 +-
 services/nodemanager/arvnodeman/daemon.py          | 230 +++++++++++----------
 services/nodemanager/arvnodeman/jobqueue.py        |  17 +-
 services/nodemanager/arvnodeman/launcher.py        |  10 +-
 services/nodemanager/tests/test_computenode.py     |   5 +-
 services/nodemanager/tests/test_daemon.py          |  10 +-
 6 files changed, 147 insertions(+), 139 deletions(-)

       via  a359a8e554dcf343198a04c8297be384fa824a2f (commit)
      from  e1a67ca409408d063f8de92438a52957c2c9644c (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit a359a8e554dcf343198a04c8297be384fa824a2f
Author: Brett Smith <brett at curoverse.com>
Date:   Wed Oct 8 09:12:17 2014 -0400

    2881: Readability improvements suggested by code review.

diff --git a/services/nodemanager/arvnodeman/computenode/__init__.py b/services/nodemanager/arvnodeman/computenode/__init__.py
index e5efa3d..ae25428 100644
--- a/services/nodemanager/arvnodeman/computenode/__init__.py
+++ b/services/nodemanager/arvnodeman/computenode/__init__.py
@@ -214,11 +214,11 @@ class ComputeNodeShutdownActor(config.actor_class):
 class ComputeNodeUpdateActor(config.actor_class):
     """Actor to dispatch one-off cloud management requests.
 
-    This actor receives requests for small cloud updates, and dispatches them
-    to a real driver.  ComputeNodeActors use this to perform maintenance
-    tasks on themselves.  Having a dedicated actor for this gives us the
-    opportunity to control the flow of requests; e.g., by backing off when
-    errors occur.
+    This actor receives requests for small cloud updates, and
+    dispatches them to a real driver.  ComputeNodeMonitorActors use
+    this to perform maintenance tasks on themselves.  Having a
+    dedicated actor for this gives us the opportunity to control the
+    flow of requests; e.g., by backing off when errors occur.
 
     This actor is most like a "traditional" Pykka actor: there's no
     subscribing, but instead methods return real driver results.  If
@@ -299,7 +299,7 @@ class ShutdownTimer(object):
         return 0 < (time.time() - self._open_start) < self._open_for
 
 
-class ComputeNodeActor(config.actor_class):
+class ComputeNodeMonitorActor(config.actor_class):
     """Actor to manage a running compute node.
 
     This actor gets updates about a compute node's cloud and Arvados records.
@@ -309,7 +309,7 @@ class ComputeNodeActor(config.actor_class):
     def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
                  timer_actor, update_actor, arvados_node=None,
                  poll_stale_after=600, node_stale_after=3600):
-        super(ComputeNodeActor, self).__init__()
+        super(ComputeNodeMonitorActor, self).__init__()
         self._later = self.actor_ref.proxy()
         self._logger = logging.getLogger('arvnodeman.computenode')
         self._last_log = None
diff --git a/services/nodemanager/arvnodeman/daemon.py b/services/nodemanager/arvnodeman/daemon.py
index ff28cc4..5b7437f 100644
--- a/services/nodemanager/arvnodeman/daemon.py
+++ b/services/nodemanager/arvnodeman/daemon.py
@@ -11,49 +11,86 @@ import pykka
 from . import computenode as cnode
 from .config import actor_class
 
-class NodeManagerDaemonActor(actor_class):
-    """Node Manager daemon.
+class _ComputeNodeRecord(object):
+    def __init__(self, actor=None, cloud_node=None, arvados_node=None,
+                 assignment_time=float('-inf')):
+        self.actor = actor
+        self.cloud_node = cloud_node
+        self.arvados_node = arvados_node
+        self.assignment_time = assignment_time
 
-    This actor subscribes to all information polls about cloud nodes,
-    Arvados nodes, and the job queue.  It creates a ComputeNodeActor
-    for every cloud node, subscribing them to poll updates
-    appropriately, and starts and stops cloud nodes based on job queue
-    demand.
-    """
-    class PairingTracker(object):
-        def __init__(self, key_func, paired_items, unpaired_items):
-            self.key_func = key_func
-            self._paired_items = paired_items
-            self._unpaired_items = unpaired_items
-
-        def all_items(self, response):
-            self.unseen = set(self._paired_items.iterkeys())
-            self.unseen.update(self._unpaired_items.iterkeys())
-            for item in response:
-                key = self.key_func(item)
+
+class _BaseNodeTracker(object):
+    def __init__(self):
+        self.nodes = {}
+        self.orphans = {}
+
+    def __getitem__(self, key):
+        return self.nodes[key]
+
+    def __len__(self):
+        return len(self.nodes)
+
+    def get(self, key, default=None):
+        return self.nodes.get(key, default)
+
+    def record_key(self, record):
+        return self.item_key(getattr(record, self.RECORD_ATTR))
+
+    def add(self, record):
+        self.nodes[self.record_key(record)] = record
+
+    def update_record(self, key, item):
+        setattr(self.nodes[key], self.RECORD_ATTR, item)
+
+    def update_from(self, response):
+        unseen = set(self.nodes.iterkeys())
+        for item in response:
+            key = self.item_key(item)
+            if key in unseen:
+                unseen.remove(key)
+                self.update_record(key, item)
+            else:
                 yield key, item
-                if key in self.unseen:
-                    self.unseen.remove(key)
+        self.orphans = {key: self.nodes.pop(key) for key in unseen}
+
+    def unpaired(self):
+        return (record for record in self.nodes.itervalues()
+                if getattr(record, self.PAIR_ATTR) is None)
+
+
+class _CloudNodeTracker(_BaseNodeTracker):
+    RECORD_ATTR = 'cloud_node'
+    PAIR_ATTR = 'arvados_node'
+    item_key = staticmethod(lambda cloud_node: cloud_node.id)
 
-        def new_items(self, response):
-            for key, item in self.all_items(response):
-                if key not in self.unseen:
-                    yield key, item
 
-        def unpaired_items(self, response):
-            for key, item in self.all_items(response):
-                if key not in self._paired_items:
-                    yield key, item
+class _ArvadosNodeTracker(_BaseNodeTracker):
+    RECORD_ATTR = 'arvados_node'
+    PAIR_ATTR = 'cloud_node'
+    item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
 
-        def unseen_items(self):
-            for key in self.unseen:
-                if key in self._paired_items:
-                    home_dict = self._paired_items
-                else:
-                    home_dict = self._unpaired_items
-                yield home_dict, key
+    def find_stale_node(self, stale_time):
+        for record in self.nodes.itervalues():
+            node = record.arvados_node
+            if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
+                                          stale_time) and
+                  not cnode.timestamp_fresh(record.assignment_time,
+                                            stale_time)):
+                return node
+        return None
 
 
+class NodeManagerDaemonActor(actor_class):
+    """Node Manager daemon.
+
+    This actor subscribes to all information polls about cloud nodes,
+    Arvados nodes, and the job queue.  It creates a ComputeNodeMonitorActor
+    for every cloud node, subscribing them to poll updates
+    appropriately.  It creates and destroys cloud nodes based on job queue
+    demand, and stops the corresponding ComputeNode actors when their work
+    is done.
+    """
     def __init__(self, server_wishlist_actor, arvados_nodes_actor,
                  cloud_nodes_actor, cloud_update_actor, timer_actor,
                  arvados_factory, cloud_factory,
@@ -61,7 +98,7 @@ class NodeManagerDaemonActor(actor_class):
                  poll_stale_after=600, node_stale_after=7200,
                  node_setup_class=cnode.ComputeNodeSetupActor,
                  node_shutdown_class=cnode.ComputeNodeShutdownActor,
-                 node_actor_class=cnode.ComputeNodeActor):
+                 node_actor_class=cnode.ComputeNodeMonitorActor):
         super(NodeManagerDaemonActor, self).__init__()
         self._node_setup = node_setup_class
         self._node_shutdown = node_shutdown_class
@@ -83,12 +120,8 @@ class NodeManagerDaemonActor(actor_class):
             poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
             setattr(self, '_{}_actor'.format(poll_name), poll_actor)
             self.last_polls[poll_name] = -self.poll_stale_after
-        # Map cloud node IDs, or Arvados node UUIDs, to their ComputeNodeActors.
-        self.unpaired_clouds = {}
-        self.paired_clouds = {}
-        self.paired_arv = {}
-        self.unpaired_arv = {}  # Arvados node UUIDs to full node data
-        self.assigned_arv = {}  # Arvados node UUIDs to assignment timestamps
+        self.cloud_nodes = _CloudNodeTracker()
+        self.arvados_nodes = _ArvadosNodeTracker()
         self.booting = {}       # Actor IDs to ComputeNodeSetupActors
         self.shutdowns = {}     # Cloud node IDs to ComputeNodeShutdownActors
         self._logger.debug("Daemon initialized")
@@ -96,19 +129,15 @@ class NodeManagerDaemonActor(actor_class):
     def _update_poll_time(self, poll_key):
         self.last_polls[poll_key] = time.time()
 
-    def _pair_nodes(self, cloud_key, arv_key, actor=None):
-        if actor is None:
-            actor = self.unpaired_clouds[cloud_key]
+    def _pair_nodes(self, node_record, arvados_node):
         self._logger.info("Cloud node %s has associated with Arvados node %s",
-                          cloud_key, arv_key)
-        self.paired_clouds[cloud_key] = actor
-        self.paired_arv[arv_key] = actor
-        self._arvados_nodes_actor.subscribe_to(arv_key,
-                                               actor.update_arvados_node)
-        self.unpaired_clouds.pop(cloud_key, None)
-        self.unpaired_arv.pop(arv_key, None)
-
-    def _new_node(self, cloud_node, arvados_node=None):
+                          node_record.cloud_node.id, arvados_node['uuid'])
+        self._arvados_nodes_actor.subscribe_to(
+            arvados_node['uuid'], node_record.actor.update_arvados_node)
+        node_record.arvados_node = arvados_node
+        self.arvados_nodes.add(node_record)
+
+    def _new_node(self, cloud_node):
         start_time = self._cloud_driver.node_start_time(cloud_node)
         shutdown_timer = cnode.ShutdownTimer(start_time,
                                              self.shutdown_windows)
@@ -118,53 +147,45 @@ class NodeManagerDaemonActor(actor_class):
             shutdown_timer=shutdown_timer,
             update_actor=self._cloud_updater,
             timer_actor=self._timer,
-            arvados_node=arvados_node,
+            arvados_node=None,
             poll_stale_after=self.poll_stale_after,
             node_stale_after=self.node_stale_after).proxy()
-        actor.subscribe(self._later.shutdown_offer)
+        actor.subscribe(self._later.node_can_shutdown)
         self._cloud_nodes_actor.subscribe_to(cloud_node.id,
                                              actor.update_cloud_node)
-        if arvados_node is not None:
-            self._pair_nodes(cloud_node.id, arvados_node['uuid'], actor)
-        return actor
+        record = _ComputeNodeRecord(actor, cloud_node)
+        self.cloud_nodes.add(record)
+        return record
 
     def update_cloud_nodes(self, nodelist):
         self._update_poll_time('cloud_nodes')
-        pairs = self.PairingTracker(lambda n: n.id,
-                                    self.paired_clouds, self.unpaired_clouds)
-        for key, node in pairs.new_items(nodelist):
-            actor = self._new_node(node)
-            for arv_key, arv_node in self.unpaired_arv.iteritems():
-                if actor.offer_arvados_pair(arv_node).get():
-                    self._pair_nodes(key, arv_key, actor)
+        for key, node in self.cloud_nodes.update_from(nodelist):
+            self._logger.info("Registering new cloud node %s", key)
+            record = self._new_node(node)
+            for arv_rec in self.arvados_nodes.unpaired():
+                if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
+                    self._pair_nodes(record, arv_rec.arvados_node)
                     break
-            else:
-                self._logger.info("Registering new cloud node %s", key)
-                self.unpaired_clouds[key] = actor
-        for source, key in pairs.unseen_items():
-            source.pop(key).stop()
+        for key, record in self.cloud_nodes.orphans.iteritems():
+            record.actor.stop()
             if key in self.shutdowns:
                 self.shutdowns.pop(key).stop()
 
     def update_arvados_nodes(self, nodelist):
         self._update_poll_time('arvados_nodes')
-        pairs = self.PairingTracker(lambda n: n['uuid'],
-                                    self.paired_arv, self.unpaired_arv)
-        for key, node in pairs.unpaired_items(nodelist):
-            if key not in self.unpaired_arv:
-                self._logger.info("Registering new Arvados node %s", key)
-            self.unpaired_arv[key] = node
-            for cloud_key, actor in self.unpaired_clouds.iteritems():
-                if actor.offer_arvados_pair(node).get():
-                    self._pair_nodes(cloud_key, key, actor)
+        for key, node in self.arvados_nodes.update_from(nodelist):
+            self._logger.info("Registering new Arvados node %s", key)
+            record = _ComputeNodeRecord(arvados_node=node)
+            self.arvados_nodes.add(record)
+        for arv_rec in self.arvados_nodes.unpaired():
+            arv_node = arv_rec.arvados_node
+            for cloud_rec in self.cloud_nodes.unpaired():
+                if cloud_rec.actor.offer_arvados_pair(arv_node).get():
+                    self._pair_nodes(cloud_rec, arv_node)
                     break
-        for source, key in pairs.unseen_items():
-            if source is self.unpaired_arv:
-                del self.unpaired_arv[key]
 
     def _node_count(self):
-        up = sum(len(nodelist) for nodelist in
-                 [self.paired_clouds, self.unpaired_clouds, self.booting])
+        up = sum(len(nodelist) for nodelist in [self.cloud_nodes, self.booting])
         return up - len(self.shutdowns)
 
     def _nodes_wanted(self):
@@ -199,35 +220,25 @@ class NodeManagerDaemonActor(actor_class):
                 return None
         return wrapper
 
-    def _find_reusable_arvados_node(self):
-        for node in self.unpaired_arv.itervalues():
-            assigned_at = self.assigned_arv.get(node['uuid'],
-                                                -self.node_stale_after)
-            if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
-                                          self.node_stale_after) and
-                not cnode.timestamp_fresh(assigned_at,
-                                          self.node_stale_after)):
-                return node
-        return None
-
     @_check_poll_freshness
     def start_node(self):
         nodes_wanted = self._nodes_wanted()
         if nodes_wanted < 1:
             return None
-        arvados_node = self._find_reusable_arvados_node()
-        size = self.last_wishlist[nodes_wanted - 1]
+        arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
+        cloud_size = self.last_wishlist[nodes_wanted - 1]
         self._logger.info("Want %s more nodes.  Booting a %s node.",
-                          nodes_wanted, size.name)
+                          nodes_wanted, cloud_size.name)
         new_setup = self._node_setup.start(
             timer_actor=self._timer,
             arvados_client=self._new_arvados(),
             arvados_node=arvados_node,
             cloud_client=self._new_cloud(),
-            cloud_size=size).proxy()
+            cloud_size=cloud_size).proxy()
         self.booting[new_setup.actor_ref.actor_urn] = new_setup
         if arvados_node is not None:
-            self.assigned_arv[arvados_node['uuid']] = time.time()
+            self.arvados_nodes[arvados_node['uuid']].assignment_time = (
+                time.time())
         new_setup.subscribe(self._later.node_up)
         if nodes_wanted > 1:
             self._later.start_node()
@@ -237,17 +248,12 @@ class NodeManagerDaemonActor(actor_class):
 
     def node_up(self, setup_proxy):
         cloud_node, arvados_node = self._actor_nodes(setup_proxy)
-        cloud_key = cloud_node.id
-        arv_key = arvados_node['uuid']
         del self.booting[setup_proxy.actor_ref.actor_urn]
-        self.assigned_arv.pop(arv_key, None)
-        if cloud_key in self.unpaired_clouds:
-            if self.unpaired_clouds[cloud_key].offer_arvados_pair(
-                  arvados_node).get():
-                self._pair_nodes(cloud_key, arv_key)
-        elif cloud_key not in self.paired_clouds:
-            self._new_node(cloud_node, arvados_node)
         setup_proxy.stop()
+        record = self.cloud_nodes.get(cloud_node.id)
+        if record is None:
+            record = self._new_node(cloud_node)
+        self._pair_nodes(record, arvados_node)
 
     @_check_poll_freshness
     def stop_booting_node(self):
@@ -263,7 +269,7 @@ class NodeManagerDaemonActor(actor_class):
                 break
 
     @_check_poll_freshness
-    def shutdown_offer(self, node_actor):
+    def node_can_shutdown(self, node_actor):
         if self._nodes_excess() < 1:
             return None
         cloud_node, arvados_node = self._actor_nodes(node_actor)
diff --git a/services/nodemanager/arvnodeman/jobqueue.py b/services/nodemanager/arvnodeman/jobqueue.py
index bae4930..08ee12e 100644
--- a/services/nodemanager/arvnodeman/jobqueue.py
+++ b/services/nodemanager/arvnodeman/jobqueue.py
@@ -15,7 +15,7 @@ class ServerCalculator(object):
     satisfies each job, and ignoring jobs that can't be satisfied.
     """
 
-    class SizeWrapper(object):
+    class CloudSizeWrapper(object):
         def __init__(self, real_size, **kwargs):
             self.real = real_size
             for name in ['id', 'name', 'ram', 'disk', 'bandwidth', 'price',
@@ -37,8 +37,9 @@ class ServerCalculator(object):
 
 
     def __init__(self, server_list, max_nodes=None):
-        self.sizes = [self.SizeWrapper(s, **kws) for s, kws in server_list]
-        self.sizes.sort(key=lambda s: s.price)
+        self.cloud_sizes = [self.CloudSizeWrapper(s, **kws)
+                            for s, kws in server_list]
+        self.cloud_sizes.sort(key=lambda s: s.price)
         self.max_nodes = max_nodes or float("inf")
 
     @staticmethod
@@ -48,12 +49,12 @@ class ServerCalculator(object):
         except (TypeError, ValueError):
             return fallback
 
-    def size_for_constraints(self, constraints):
+    def cloud_size_for_constraints(self, constraints):
         want_value = lambda key: self.coerce_int(constraints.get(key), 0)
         wants = {'cores': want_value('min_cores_per_node'),
                  'ram': want_value('min_ram_mb_per_node'),
                  'scratch': want_value('min_scratch_mb_per_node')}
-        for size in self.sizes:
+        for size in self.cloud_sizes:
             if size.meets_constraints(**wants):
                 return size
         return None
@@ -63,9 +64,9 @@ class ServerCalculator(object):
         for job in queue:
             constraints = job['runtime_constraints']
             want_count = self.coerce_int(constraints.get('min_nodes'), 1)
-            size = self.size_for_constraints(constraints)
-            if (want_count < self.max_nodes) and (size is not None):
-                servers.extend([size.real] * max(1, want_count))
+            cloud_size = self.cloud_size_for_constraints(constraints)
+            if (want_count < self.max_nodes) and (cloud_size is not None):
+                servers.extend([cloud_size.real] * max(1, want_count))
         return servers
 
 
diff --git a/services/nodemanager/arvnodeman/launcher.py b/services/nodemanager/arvnodeman/launcher.py
index 84cb564..87f2dda 100644
--- a/services/nodemanager/arvnodeman/launcher.py
+++ b/services/nodemanager/arvnodeman/launcher.py
@@ -14,7 +14,7 @@ import pykka
 from . import config as nmconfig
 from .computenode import \
     ComputeNodeSetupActor, ComputeNodeShutdownActor, ComputeNodeUpdateActor, \
-    ComputeNodeActor, ShutdownTimer
+    ShutdownTimer
 from .daemon import NodeManagerDaemonActor
 from .jobqueue import JobQueueMonitorActor, ServerCalculator
 from .nodelist import ArvadosNodeListMonitorActor, CloudNodeListMonitorActor
@@ -63,12 +63,12 @@ def setup_logging(path, level, **sublevels):
 def launch_pollers(config):
     cloud_client = config.new_cloud_client()
     arvados_client = config.new_arvados_client()
-    size_list = config.node_sizes(cloud_client.list_sizes())
-    if not size_list:
-        abort("No valid sizes configured")
+    cloud_size_list = config.node_sizes(cloud_client.list_sizes())
+    if not cloud_size_list:
+        abort("No valid node sizes configured")
 
     server_calculator = ServerCalculator(
-        size_list, config.getint('Daemon', 'max_nodes'))
+        cloud_size_list, config.getint('Daemon', 'max_nodes'))
     poll_time = config.getint('Daemon', 'poll_time')
     max_poll_time = config.getint('Daemon', 'max_poll_time')
 
diff --git a/services/nodemanager/tests/test_computenode.py b/services/nodemanager/tests/test_computenode.py
index 59e40e7..2fc7a50 100644
--- a/services/nodemanager/tests/test_computenode.py
+++ b/services/nodemanager/tests/test_computenode.py
@@ -147,7 +147,8 @@ class ShutdownTimerTestCase(unittest.TestCase):
         self.assertFalse(timer.window_open())
 
 
-class ComputeNodeActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
+class ComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
+                                      unittest.TestCase):
     class MockShutdownTimer(object):
         def _set_state(self, is_open, next_opening):
             self.window_open = lambda: is_open
@@ -168,7 +169,7 @@ class ComputeNodeActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
         if start_time is None:
             start_time = time.time()
         start_time = time.time()
-        self.node_actor = cnode.ComputeNodeActor.start(
+        self.node_actor = cnode.ComputeNodeMonitorActor.start(
             self.cloud_mock, start_time, self.shutdowns, self.timer,
             self.updates, arv_node).proxy()
         self.node_actor.subscribe(self.subscriber)
diff --git a/services/nodemanager/tests/test_daemon.py b/services/nodemanager/tests/test_daemon.py
index 5f5e733..176b096 100644
--- a/services/nodemanager/tests/test_daemon.py
+++ b/services/nodemanager/tests/test_daemon.py
@@ -96,9 +96,9 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.assertFalse(self.node_setup.called)
 
     def test_no_duplication_when_booting_node_listed_fast(self):
-        # Test that we don't start two ComputeNodeActors when we learn about
-        # a booting node through a listing before we get the "node up"
-        # message from CloudNodeSetupActor.
+        # Test that we don't start two ComputeNodeMonitorActors when
+        # we learn about a booting node through a listing before we
+        # get the "node up" message from CloudNodeSetupActor.
         cloud_node = testutil.cloud_node_mock(1)
         self.make_daemon(want_sizes=[testutil.MockSize(1)])
         self.wait_for_call(self.node_setup.start)
@@ -124,13 +124,13 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         size = testutil.MockSize(1)
         self.make_daemon(cloud_nodes=[cloud_node], want_sizes=[size])
         node_actor = self.node_factory().proxy()
-        self.daemon.shutdown_offer(node_actor).get(self.TIMEOUT)
+        self.daemon.node_can_shutdown(node_actor).get(self.TIMEOUT)
         self.assertFalse(node_actor.shutdown.called)
 
     def test_shutdown_accepted_below_capacity(self):
         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
         node_actor = self.node_factory().proxy()
-        self.daemon.shutdown_offer(node_actor)
+        self.daemon.node_can_shutdown(node_actor)
         self.wait_for_call(self.node_shutdown.start)
 
     def test_clean_shutdown_waits_for_node_setup_finish(self):

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list