[ARVADOS] updated: a359a8e554dcf343198a04c8297be384fa824a2f
git at public.curoverse.com
git at public.curoverse.com
Wed Oct 8 09:10:23 EDT 2014
Summary of changes:
.../nodemanager/arvnodeman/computenode/__init__.py | 14 +-
services/nodemanager/arvnodeman/daemon.py | 230 +++++++++++----------
services/nodemanager/arvnodeman/jobqueue.py | 17 +-
services/nodemanager/arvnodeman/launcher.py | 10 +-
services/nodemanager/tests/test_computenode.py | 5 +-
services/nodemanager/tests/test_daemon.py | 10 +-
6 files changed, 147 insertions(+), 139 deletions(-)
via a359a8e554dcf343198a04c8297be384fa824a2f (commit)
from e1a67ca409408d063f8de92438a52957c2c9644c (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit a359a8e554dcf343198a04c8297be384fa824a2f
Author: Brett Smith <brett at curoverse.com>
Date: Wed Oct 8 09:12:17 2014 -0400
2881: Readability improvements suggested by code review.
diff --git a/services/nodemanager/arvnodeman/computenode/__init__.py b/services/nodemanager/arvnodeman/computenode/__init__.py
index e5efa3d..ae25428 100644
--- a/services/nodemanager/arvnodeman/computenode/__init__.py
+++ b/services/nodemanager/arvnodeman/computenode/__init__.py
@@ -214,11 +214,11 @@ class ComputeNodeShutdownActor(config.actor_class):
class ComputeNodeUpdateActor(config.actor_class):
"""Actor to dispatch one-off cloud management requests.
- This actor receives requests for small cloud updates, and dispatches them
- to a real driver. ComputeNodeActors use this to perform maintenance
- tasks on themselves. Having a dedicated actor for this gives us the
- opportunity to control the flow of requests; e.g., by backing off when
- errors occur.
+ This actor receives requests for small cloud updates, and
+ dispatches them to a real driver. ComputeNodeMonitorActors use
+ this to perform maintenance tasks on themselves. Having a
+ dedicated actor for this gives us the opportunity to control the
+ flow of requests; e.g., by backing off when errors occur.
This actor is most like a "traditional" Pykka actor: there's no
subscribing, but instead methods return real driver results. If
@@ -299,7 +299,7 @@ class ShutdownTimer(object):
return 0 < (time.time() - self._open_start) < self._open_for
-class ComputeNodeActor(config.actor_class):
+class ComputeNodeMonitorActor(config.actor_class):
"""Actor to manage a running compute node.
This actor gets updates about a compute node's cloud and Arvados records.
@@ -309,7 +309,7 @@ class ComputeNodeActor(config.actor_class):
def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
timer_actor, update_actor, arvados_node=None,
poll_stale_after=600, node_stale_after=3600):
- super(ComputeNodeActor, self).__init__()
+ super(ComputeNodeMonitorActor, self).__init__()
self._later = self.actor_ref.proxy()
self._logger = logging.getLogger('arvnodeman.computenode')
self._last_log = None
diff --git a/services/nodemanager/arvnodeman/daemon.py b/services/nodemanager/arvnodeman/daemon.py
index ff28cc4..5b7437f 100644
--- a/services/nodemanager/arvnodeman/daemon.py
+++ b/services/nodemanager/arvnodeman/daemon.py
@@ -11,49 +11,86 @@ import pykka
from . import computenode as cnode
from .config import actor_class
-class NodeManagerDaemonActor(actor_class):
- """Node Manager daemon.
+class _ComputeNodeRecord(object):
+ def __init__(self, actor=None, cloud_node=None, arvados_node=None,
+ assignment_time=float('-inf')):
+ self.actor = actor
+ self.cloud_node = cloud_node
+ self.arvados_node = arvados_node
+ self.assignment_time = assignment_time
- This actor subscribes to all information polls about cloud nodes,
- Arvados nodes, and the job queue. It creates a ComputeNodeActor
- for every cloud node, subscribing them to poll updates
- appropriately, and starts and stops cloud nodes based on job queue
- demand.
- """
- class PairingTracker(object):
- def __init__(self, key_func, paired_items, unpaired_items):
- self.key_func = key_func
- self._paired_items = paired_items
- self._unpaired_items = unpaired_items
-
- def all_items(self, response):
- self.unseen = set(self._paired_items.iterkeys())
- self.unseen.update(self._unpaired_items.iterkeys())
- for item in response:
- key = self.key_func(item)
+
+class _BaseNodeTracker(object):
+ def __init__(self):
+ self.nodes = {}
+ self.orphans = {}
+
+ def __getitem__(self, key):
+ return self.nodes[key]
+
+ def __len__(self):
+ return len(self.nodes)
+
+ def get(self, key, default=None):
+ return self.nodes.get(key, default)
+
+ def record_key(self, record):
+ return self.item_key(getattr(record, self.RECORD_ATTR))
+
+ def add(self, record):
+ self.nodes[self.record_key(record)] = record
+
+ def update_record(self, key, item):
+ setattr(self.nodes[key], self.RECORD_ATTR, item)
+
+ def update_from(self, response):
+ unseen = set(self.nodes.iterkeys())
+ for item in response:
+ key = self.item_key(item)
+ if key in unseen:
+ unseen.remove(key)
+ self.update_record(key, item)
+ else:
yield key, item
- if key in self.unseen:
- self.unseen.remove(key)
+ self.orphans = {key: self.nodes.pop(key) for key in unseen}
+
+ def unpaired(self):
+ return (record for record in self.nodes.itervalues()
+ if getattr(record, self.PAIR_ATTR) is None)
+
+
+class _CloudNodeTracker(_BaseNodeTracker):
+ RECORD_ATTR = 'cloud_node'
+ PAIR_ATTR = 'arvados_node'
+ item_key = staticmethod(lambda cloud_node: cloud_node.id)
- def new_items(self, response):
- for key, item in self.all_items(response):
- if key not in self.unseen:
- yield key, item
- def unpaired_items(self, response):
- for key, item in self.all_items(response):
- if key not in self._paired_items:
- yield key, item
+class _ArvadosNodeTracker(_BaseNodeTracker):
+ RECORD_ATTR = 'arvados_node'
+ PAIR_ATTR = 'cloud_node'
+ item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
- def unseen_items(self):
- for key in self.unseen:
- if key in self._paired_items:
- home_dict = self._paired_items
- else:
- home_dict = self._unpaired_items
- yield home_dict, key
+ def find_stale_node(self, stale_time):
+ for record in self.nodes.itervalues():
+ node = record.arvados_node
+ if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
+ stale_time) and
+ not cnode.timestamp_fresh(record.assignment_time,
+ stale_time)):
+ return node
+ return None
+class NodeManagerDaemonActor(actor_class):
+ """Node Manager daemon.
+
+ This actor subscribes to all information polls about cloud nodes,
+ Arvados nodes, and the job queue. It creates a ComputeNodeMonitorActor
+ for every cloud node, subscribing them to poll updates
+ appropriately. It creates and destroys cloud nodes based on job queue
+ demand, and stops the corresponding ComputeNode actors when their work
+ is done.
+ """
def __init__(self, server_wishlist_actor, arvados_nodes_actor,
cloud_nodes_actor, cloud_update_actor, timer_actor,
arvados_factory, cloud_factory,
@@ -61,7 +98,7 @@ class NodeManagerDaemonActor(actor_class):
poll_stale_after=600, node_stale_after=7200,
node_setup_class=cnode.ComputeNodeSetupActor,
node_shutdown_class=cnode.ComputeNodeShutdownActor,
- node_actor_class=cnode.ComputeNodeActor):
+ node_actor_class=cnode.ComputeNodeMonitorActor):
super(NodeManagerDaemonActor, self).__init__()
self._node_setup = node_setup_class
self._node_shutdown = node_shutdown_class
@@ -83,12 +120,8 @@ class NodeManagerDaemonActor(actor_class):
poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
setattr(self, '_{}_actor'.format(poll_name), poll_actor)
self.last_polls[poll_name] = -self.poll_stale_after
- # Map cloud node IDs, or Arvados node UUIDs, to their ComputeNodeActors.
- self.unpaired_clouds = {}
- self.paired_clouds = {}
- self.paired_arv = {}
- self.unpaired_arv = {} # Arvados node UUIDs to full node data
- self.assigned_arv = {} # Arvados node UUIDs to assignment timestamps
+ self.cloud_nodes = _CloudNodeTracker()
+ self.arvados_nodes = _ArvadosNodeTracker()
self.booting = {} # Actor IDs to ComputeNodeSetupActors
self.shutdowns = {} # Cloud node IDs to ComputeNodeShutdownActors
self._logger.debug("Daemon initialized")
@@ -96,19 +129,15 @@ class NodeManagerDaemonActor(actor_class):
def _update_poll_time(self, poll_key):
self.last_polls[poll_key] = time.time()
- def _pair_nodes(self, cloud_key, arv_key, actor=None):
- if actor is None:
- actor = self.unpaired_clouds[cloud_key]
+ def _pair_nodes(self, node_record, arvados_node):
self._logger.info("Cloud node %s has associated with Arvados node %s",
- cloud_key, arv_key)
- self.paired_clouds[cloud_key] = actor
- self.paired_arv[arv_key] = actor
- self._arvados_nodes_actor.subscribe_to(arv_key,
- actor.update_arvados_node)
- self.unpaired_clouds.pop(cloud_key, None)
- self.unpaired_arv.pop(arv_key, None)
-
- def _new_node(self, cloud_node, arvados_node=None):
+ node_record.cloud_node.id, arvados_node['uuid'])
+ self._arvados_nodes_actor.subscribe_to(
+ arvados_node['uuid'], node_record.actor.update_arvados_node)
+ node_record.arvados_node = arvados_node
+ self.arvados_nodes.add(node_record)
+
+ def _new_node(self, cloud_node):
start_time = self._cloud_driver.node_start_time(cloud_node)
shutdown_timer = cnode.ShutdownTimer(start_time,
self.shutdown_windows)
@@ -118,53 +147,45 @@ class NodeManagerDaemonActor(actor_class):
shutdown_timer=shutdown_timer,
update_actor=self._cloud_updater,
timer_actor=self._timer,
- arvados_node=arvados_node,
+ arvados_node=None,
poll_stale_after=self.poll_stale_after,
node_stale_after=self.node_stale_after).proxy()
- actor.subscribe(self._later.shutdown_offer)
+ actor.subscribe(self._later.node_can_shutdown)
self._cloud_nodes_actor.subscribe_to(cloud_node.id,
actor.update_cloud_node)
- if arvados_node is not None:
- self._pair_nodes(cloud_node.id, arvados_node['uuid'], actor)
- return actor
+ record = _ComputeNodeRecord(actor, cloud_node)
+ self.cloud_nodes.add(record)
+ return record
def update_cloud_nodes(self, nodelist):
self._update_poll_time('cloud_nodes')
- pairs = self.PairingTracker(lambda n: n.id,
- self.paired_clouds, self.unpaired_clouds)
- for key, node in pairs.new_items(nodelist):
- actor = self._new_node(node)
- for arv_key, arv_node in self.unpaired_arv.iteritems():
- if actor.offer_arvados_pair(arv_node).get():
- self._pair_nodes(key, arv_key, actor)
+ for key, node in self.cloud_nodes.update_from(nodelist):
+ self._logger.info("Registering new cloud node %s", key)
+ record = self._new_node(node)
+ for arv_rec in self.arvados_nodes.unpaired():
+ if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
+ self._pair_nodes(record, arv_rec.arvados_node)
break
- else:
- self._logger.info("Registering new cloud node %s", key)
- self.unpaired_clouds[key] = actor
- for source, key in pairs.unseen_items():
- source.pop(key).stop()
+ for key, record in self.cloud_nodes.orphans.iteritems():
+ record.actor.stop()
if key in self.shutdowns:
self.shutdowns.pop(key).stop()
def update_arvados_nodes(self, nodelist):
self._update_poll_time('arvados_nodes')
- pairs = self.PairingTracker(lambda n: n['uuid'],
- self.paired_arv, self.unpaired_arv)
- for key, node in pairs.unpaired_items(nodelist):
- if key not in self.unpaired_arv:
- self._logger.info("Registering new Arvados node %s", key)
- self.unpaired_arv[key] = node
- for cloud_key, actor in self.unpaired_clouds.iteritems():
- if actor.offer_arvados_pair(node).get():
- self._pair_nodes(cloud_key, key, actor)
+ for key, node in self.arvados_nodes.update_from(nodelist):
+ self._logger.info("Registering new Arvados node %s", key)
+ record = _ComputeNodeRecord(arvados_node=node)
+ self.arvados_nodes.add(record)
+ for arv_rec in self.arvados_nodes.unpaired():
+ arv_node = arv_rec.arvados_node
+ for cloud_rec in self.cloud_nodes.unpaired():
+ if cloud_rec.actor.offer_arvados_pair(arv_node).get():
+ self._pair_nodes(cloud_rec, arv_node)
break
- for source, key in pairs.unseen_items():
- if source is self.unpaired_arv:
- del self.unpaired_arv[key]
def _node_count(self):
- up = sum(len(nodelist) for nodelist in
- [self.paired_clouds, self.unpaired_clouds, self.booting])
+ up = sum(len(nodelist) for nodelist in [self.cloud_nodes, self.booting])
return up - len(self.shutdowns)
def _nodes_wanted(self):
@@ -199,35 +220,25 @@ class NodeManagerDaemonActor(actor_class):
return None
return wrapper
- def _find_reusable_arvados_node(self):
- for node in self.unpaired_arv.itervalues():
- assigned_at = self.assigned_arv.get(node['uuid'],
- -self.node_stale_after)
- if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
- self.node_stale_after) and
- not cnode.timestamp_fresh(assigned_at,
- self.node_stale_after)):
- return node
- return None
-
@_check_poll_freshness
def start_node(self):
nodes_wanted = self._nodes_wanted()
if nodes_wanted < 1:
return None
- arvados_node = self._find_reusable_arvados_node()
- size = self.last_wishlist[nodes_wanted - 1]
+ arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
+ cloud_size = self.last_wishlist[nodes_wanted - 1]
self._logger.info("Want %s more nodes. Booting a %s node.",
- nodes_wanted, size.name)
+ nodes_wanted, cloud_size.name)
new_setup = self._node_setup.start(
timer_actor=self._timer,
arvados_client=self._new_arvados(),
arvados_node=arvados_node,
cloud_client=self._new_cloud(),
- cloud_size=size).proxy()
+ cloud_size=cloud_size).proxy()
self.booting[new_setup.actor_ref.actor_urn] = new_setup
if arvados_node is not None:
- self.assigned_arv[arvados_node['uuid']] = time.time()
+ self.arvados_nodes[arvados_node['uuid']].assignment_time = (
+ time.time())
new_setup.subscribe(self._later.node_up)
if nodes_wanted > 1:
self._later.start_node()
@@ -237,17 +248,12 @@ class NodeManagerDaemonActor(actor_class):
def node_up(self, setup_proxy):
cloud_node, arvados_node = self._actor_nodes(setup_proxy)
- cloud_key = cloud_node.id
- arv_key = arvados_node['uuid']
del self.booting[setup_proxy.actor_ref.actor_urn]
- self.assigned_arv.pop(arv_key, None)
- if cloud_key in self.unpaired_clouds:
- if self.unpaired_clouds[cloud_key].offer_arvados_pair(
- arvados_node).get():
- self._pair_nodes(cloud_key, arv_key)
- elif cloud_key not in self.paired_clouds:
- self._new_node(cloud_node, arvados_node)
setup_proxy.stop()
+ record = self.cloud_nodes.get(cloud_node.id)
+ if record is None:
+ record = self._new_node(cloud_node)
+ self._pair_nodes(record, arvados_node)
@_check_poll_freshness
def stop_booting_node(self):
@@ -263,7 +269,7 @@ class NodeManagerDaemonActor(actor_class):
break
@_check_poll_freshness
- def shutdown_offer(self, node_actor):
+ def node_can_shutdown(self, node_actor):
if self._nodes_excess() < 1:
return None
cloud_node, arvados_node = self._actor_nodes(node_actor)
diff --git a/services/nodemanager/arvnodeman/jobqueue.py b/services/nodemanager/arvnodeman/jobqueue.py
index bae4930..08ee12e 100644
--- a/services/nodemanager/arvnodeman/jobqueue.py
+++ b/services/nodemanager/arvnodeman/jobqueue.py
@@ -15,7 +15,7 @@ class ServerCalculator(object):
satisfies each job, and ignoring jobs that can't be satisfied.
"""
- class SizeWrapper(object):
+ class CloudSizeWrapper(object):
def __init__(self, real_size, **kwargs):
self.real = real_size
for name in ['id', 'name', 'ram', 'disk', 'bandwidth', 'price',
@@ -37,8 +37,9 @@ class ServerCalculator(object):
def __init__(self, server_list, max_nodes=None):
- self.sizes = [self.SizeWrapper(s, **kws) for s, kws in server_list]
- self.sizes.sort(key=lambda s: s.price)
+ self.cloud_sizes = [self.CloudSizeWrapper(s, **kws)
+ for s, kws in server_list]
+ self.cloud_sizes.sort(key=lambda s: s.price)
self.max_nodes = max_nodes or float("inf")
@staticmethod
@@ -48,12 +49,12 @@ class ServerCalculator(object):
except (TypeError, ValueError):
return fallback
- def size_for_constraints(self, constraints):
+ def cloud_size_for_constraints(self, constraints):
want_value = lambda key: self.coerce_int(constraints.get(key), 0)
wants = {'cores': want_value('min_cores_per_node'),
'ram': want_value('min_ram_mb_per_node'),
'scratch': want_value('min_scratch_mb_per_node')}
- for size in self.sizes:
+ for size in self.cloud_sizes:
if size.meets_constraints(**wants):
return size
return None
@@ -63,9 +64,9 @@ class ServerCalculator(object):
for job in queue:
constraints = job['runtime_constraints']
want_count = self.coerce_int(constraints.get('min_nodes'), 1)
- size = self.size_for_constraints(constraints)
- if (want_count < self.max_nodes) and (size is not None):
- servers.extend([size.real] * max(1, want_count))
+ cloud_size = self.cloud_size_for_constraints(constraints)
+ if (want_count < self.max_nodes) and (cloud_size is not None):
+ servers.extend([cloud_size.real] * max(1, want_count))
return servers
diff --git a/services/nodemanager/arvnodeman/launcher.py b/services/nodemanager/arvnodeman/launcher.py
index 84cb564..87f2dda 100644
--- a/services/nodemanager/arvnodeman/launcher.py
+++ b/services/nodemanager/arvnodeman/launcher.py
@@ -14,7 +14,7 @@ import pykka
from . import config as nmconfig
from .computenode import \
ComputeNodeSetupActor, ComputeNodeShutdownActor, ComputeNodeUpdateActor, \
- ComputeNodeActor, ShutdownTimer
+ ShutdownTimer
from .daemon import NodeManagerDaemonActor
from .jobqueue import JobQueueMonitorActor, ServerCalculator
from .nodelist import ArvadosNodeListMonitorActor, CloudNodeListMonitorActor
@@ -63,12 +63,12 @@ def setup_logging(path, level, **sublevels):
def launch_pollers(config):
cloud_client = config.new_cloud_client()
arvados_client = config.new_arvados_client()
- size_list = config.node_sizes(cloud_client.list_sizes())
- if not size_list:
- abort("No valid sizes configured")
+ cloud_size_list = config.node_sizes(cloud_client.list_sizes())
+ if not cloud_size_list:
+ abort("No valid node sizes configured")
server_calculator = ServerCalculator(
- size_list, config.getint('Daemon', 'max_nodes'))
+ cloud_size_list, config.getint('Daemon', 'max_nodes'))
poll_time = config.getint('Daemon', 'poll_time')
max_poll_time = config.getint('Daemon', 'max_poll_time')
diff --git a/services/nodemanager/tests/test_computenode.py b/services/nodemanager/tests/test_computenode.py
index 59e40e7..2fc7a50 100644
--- a/services/nodemanager/tests/test_computenode.py
+++ b/services/nodemanager/tests/test_computenode.py
@@ -147,7 +147,8 @@ class ShutdownTimerTestCase(unittest.TestCase):
self.assertFalse(timer.window_open())
-class ComputeNodeActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
+class ComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
+ unittest.TestCase):
class MockShutdownTimer(object):
def _set_state(self, is_open, next_opening):
self.window_open = lambda: is_open
@@ -168,7 +169,7 @@ class ComputeNodeActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
if start_time is None:
start_time = time.time()
start_time = time.time()
- self.node_actor = cnode.ComputeNodeActor.start(
+ self.node_actor = cnode.ComputeNodeMonitorActor.start(
self.cloud_mock, start_time, self.shutdowns, self.timer,
self.updates, arv_node).proxy()
self.node_actor.subscribe(self.subscriber)
diff --git a/services/nodemanager/tests/test_daemon.py b/services/nodemanager/tests/test_daemon.py
index 5f5e733..176b096 100644
--- a/services/nodemanager/tests/test_daemon.py
+++ b/services/nodemanager/tests/test_daemon.py
@@ -96,9 +96,9 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
self.assertFalse(self.node_setup.called)
def test_no_duplication_when_booting_node_listed_fast(self):
- # Test that we don't start two ComputeNodeActors when we learn about
- # a booting node through a listing before we get the "node up"
- # message from CloudNodeSetupActor.
+ # Test that we don't start two ComputeNodeMonitorActors when
+ # we learn about a booting node through a listing before we
+ # get the "node up" message from CloudNodeSetupActor.
cloud_node = testutil.cloud_node_mock(1)
self.make_daemon(want_sizes=[testutil.MockSize(1)])
self.wait_for_call(self.node_setup.start)
@@ -124,13 +124,13 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
size = testutil.MockSize(1)
self.make_daemon(cloud_nodes=[cloud_node], want_sizes=[size])
node_actor = self.node_factory().proxy()
- self.daemon.shutdown_offer(node_actor).get(self.TIMEOUT)
+ self.daemon.node_can_shutdown(node_actor).get(self.TIMEOUT)
self.assertFalse(node_actor.shutdown.called)
def test_shutdown_accepted_below_capacity(self):
self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
node_actor = self.node_factory().proxy()
- self.daemon.shutdown_offer(node_actor)
+ self.daemon.node_can_shutdown(node_actor)
self.wait_for_call(self.node_shutdown.start)
def test_clean_shutdown_waits_for_node_setup_finish(self):
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list