[ARVADOS] created: fd75f94f65d11989985bffe8d85fb6da7f75720e
Git user
git at public.curoverse.com
Fri May 13 10:11:46 EDT 2016
at fd75f94f65d11989985bffe8d85fb6da7f75720e (commit)
commit fd75f94f65d11989985bffe8d85fb6da7f75720e
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Fri May 13 10:11:39 2016 -0400
9161: Eliminate 'booted' list and put nodes directly into cloud_nodes list.
Refactor logic for registering cloud nodes. Refactor computation of nodes
wanted; explicitly model 'unpaired' and 'down'.
diff --git a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
index 0d63dc9..b43e8a3 100644
--- a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
+++ b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
@@ -88,7 +88,7 @@ class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
Manager to handle).
"""
def __init__(self, timer_actor, arvados_client, cloud_client,
- cloud_size, arvados_node=None,
+ cloud_size, arvados_node,
retry_wait=1, max_retry_wait=180):
super(ComputeNodeSetupActor, self).__init__(
cloud_client, arvados_client, timer_actor,
@@ -96,16 +96,7 @@ class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
self.cloud_size = cloud_size
self.arvados_node = None
self.cloud_node = None
- if arvados_node is None:
- self._later.create_arvados_node()
- else:
- self._later.prepare_arvados_node(arvados_node)
-
- @ComputeNodeStateChangeBase._finish_on_exception
- @RetryMixin._retry(config.ARVADOS_ERRORS)
- def create_arvados_node(self):
- self.arvados_node = self._arvados.nodes().create(body={}).execute()
- self._later.create_cloud_node()
+ self._later.prepare_arvados_node(arvados_node)
@ComputeNodeStateChangeBase._finish_on_exception
@RetryMixin._retry(config.ARVADOS_ERRORS)
@@ -353,13 +344,20 @@ class ComputeNodeMonitorActor(config.actor_class):
# There's a window between when a node pings for the first time and the
# value of 'slurm_state' is synchronized by crunch-dispatch. In this
- # window, the node will still report as 'down'. Check first_ping_at
- # and implement a grace period where the node should will be considered
- # 'idle'.
- if state == 'down' and timestamp_fresh(
- arvados_timestamp(self.arvados_node['first_ping_at']), self.poll_stale_after):
+ # window, the node will still report as 'down'. Check that
+ # first_ping_at is truthy and consider the node 'idle' during the
+ # initial boot grace period.
+ if (state == 'down' and
+ self.arvados_node['first_ping_at'] and
+ timestamp_fresh(self.cloud_node_start_time,
+ self.boot_fail_after)):
state = 'idle'
+ # "missing" means last_ping_at is stale, this should be
+ # considered "down"
+ if arvados_node_missing(self.arvados_node, self.node_stale_after):
+ state = 'down'
+
result = state in states
if state == 'idle':
result = result and not self.arvados_node['job_uuid']
@@ -384,8 +382,12 @@ class ComputeNodeMonitorActor(config.actor_class):
crunch_worker_state = 'unpaired'
elif not timestamp_fresh(arvados_node_mtime(self.arvados_node), self.node_stale_after):
return (False, "node state is stale")
- elif self.arvados_node['crunch_worker_state']:
- crunch_worker_state = self.arvados_node['crunch_worker_state']
+ elif self.in_state('down'):
+ crunch_worker_state = 'down'
+ elif self.in_state('idle'):
+ crunch_worker_state = 'idle'
+ elif self.in_state('busy'):
+ crunch_worker_state = 'busy'
else:
return (False, "node is paired but crunch_worker_state is '%s'" % self.arvados_node['crunch_worker_state'])
diff --git a/services/nodemanager/arvnodeman/daemon.py b/services/nodemanager/arvnodeman/daemon.py
index 1120440..c4b0f3b 100644
--- a/services/nodemanager/arvnodeman/daemon.py
+++ b/services/nodemanager/arvnodeman/daemon.py
@@ -67,6 +67,10 @@ class _BaseNodeTracker(object):
return (record for record in self.nodes.itervalues()
if getattr(record, self.PAIR_ATTR) is None)
+ def paired(self):
+ return (record for record in self.nodes.itervalues()
+ if getattr(record, self.PAIR_ATTR) is not None)
+
class _CloudNodeTracker(_BaseNodeTracker):
RECORD_ATTR = 'cloud_node'
@@ -139,8 +143,7 @@ class NodeManagerDaemonActor(actor_class):
self.last_polls[poll_name] = -self.poll_stale_after
self.cloud_nodes = _CloudNodeTracker()
self.arvados_nodes = _ArvadosNodeTracker()
- self.booting = {} # Actor IDs to ComputeNodeSetupActors
- self.booted = {} # Cloud node IDs to _ComputeNodeRecords
+ self.booting = {} # Arvados node UUID to ComputeNodeSetupActors
self.shutdowns = {} # Cloud node IDs to ComputeNodeShutdownActors
self.sizes_booting_shutdown = {} # Actor IDs or Cloud node IDs to node size
@@ -182,19 +185,22 @@ class NodeManagerDaemonActor(actor_class):
record = _ComputeNodeRecord(actor.proxy(), cloud_node)
return record
+ def _register_cloud_node(self, node):
+ rec = self.cloud_nodes.get(node.id)
+ if rec is None:
+ self._logger.info("Registering new cloud node %s", node.id)
+ record = self._new_node(node)
+ self.cloud_nodes.add(record)
+ else:
+ rec.cloud_node = node
+
def update_cloud_nodes(self, nodelist):
self._update_poll_time('cloud_nodes')
- for key, node in self.cloud_nodes.update_from(nodelist):
- self._logger.info("Registering new cloud node %s", key)
- if key in self.booted:
- record = self.booted.pop(key)
- else:
- record = self._new_node(node)
- self.cloud_nodes.add(record)
- for arv_rec in self.arvados_nodes.unpaired():
- if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
- self._pair_nodes(record, arv_rec.arvados_node)
- break
+ for _, node in self.cloud_nodes.update_from(nodelist):
+ self._register_cloud_node(node)
+
+ self.try_pairing()
+
for key, record in self.cloud_nodes.orphans.iteritems():
if key in self.shutdowns:
try:
@@ -206,26 +212,29 @@ class NodeManagerDaemonActor(actor_class):
record.actor.stop()
record.cloud_node = None
+ def _register_arvados_node(self, key, arv_node):
+ self._logger.info("Registering new Arvados node %s", key)
+ record = _ComputeNodeRecord(arvados_node=arv_node)
+ self.arvados_nodes.add(record)
+
def update_arvados_nodes(self, nodelist):
self._update_poll_time('arvados_nodes')
for key, node in self.arvados_nodes.update_from(nodelist):
- self._logger.info("Registering new Arvados node %s", key)
- record = _ComputeNodeRecord(arvados_node=node)
- self.arvados_nodes.add(record)
- for arv_rec in self.arvados_nodes.unpaired():
- arv_node = arv_rec.arvados_node
- for cloud_rec in self.cloud_nodes.unpaired():
- if cloud_rec.actor.offer_arvados_pair(arv_node).get():
- self._pair_nodes(cloud_rec, arv_node)
+ self._register_arvados_node(key, node)
+ self.try_pairing()
+
+ def try_pairing(self):
+ for record in self.cloud_nodes.unpaired():
+ for arv_rec in self.arvados_nodes.unpaired():
+ if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
+ self._pair_nodes(record, arv_rec.arvados_node)
break
+
def _nodes_booting(self, size):
s = sum(1
for c in self.booting.iterkeys()
if size is None or self.sizes_booting_shutdown[c].id == size.id)
- s += sum(1
- for c in self.booted.itervalues()
- if size is None or c.cloud_node.size.id == size.id)
return s
def _nodes_unpaired(self, size):
@@ -233,9 +242,9 @@ class NodeManagerDaemonActor(actor_class):
for c in self.cloud_nodes.unpaired()
if size is None or c.cloud_node.size.id == size.id)
- def _nodes_booted(self, size):
+ def _nodes_paired(self, size):
return sum(1
- for c in self.cloud_nodes.nodes.itervalues()
+ for c in self.cloud_nodes.paired()
if size is None or c.cloud_node.size.id == size.id)
def _nodes_down(self, size):
@@ -249,7 +258,7 @@ class NodeManagerDaemonActor(actor_class):
if down)
def _nodes_up(self, size):
- up = (self._nodes_booting(size) + self._nodes_booted(size)) - self._nodes_down(size)
+ up = (self._nodes_booting(size) + self._nodes_unpaired(size) + self._nodes_paired(size)) - self._nodes_down(size)
return up
def _total_price(self):
@@ -257,8 +266,7 @@ class NodeManagerDaemonActor(actor_class):
cost += sum(self.server_calculator.find_size(self.sizes_booting_shutdown[c].id).price
for c in self.booting.iterkeys())
cost += sum(self.server_calculator.find_size(c.cloud_node.size.id).price
- for i in (self.booted, self.cloud_nodes.nodes)
- for c in i.itervalues())
+ for c in self.cloud_nodes.nodes.itervalues())
return cost
def _nodes_busy(self, size):
@@ -268,13 +276,6 @@ class NodeManagerDaemonActor(actor_class):
if rec.cloud_node.size.id == size.id)
if busy)
- def _nodes_missing(self, size):
- return sum(1 for arv_node in
- pykka.get_all(rec.actor.arvados_node for rec in
- self.cloud_nodes.nodes.itervalues()
- if rec.cloud_node.size.id == size.id and rec.actor.cloud_node.get().id not in self.shutdowns)
- if arv_node and cnode.arvados_node_missing(arv_node, self.node_stale_after))
-
def _size_wishlist(self, size):
return sum(1 for c in self.last_wishlist if c.id == size.id)
@@ -299,20 +300,26 @@ class NodeManagerDaemonActor(actor_class):
elif under_min > 0 and size.id == self.min_cloud_size.id:
return under_min
- booting_count = self._nodes_booting(size) + self._nodes_unpaired(size)
- shutdown_count = self._size_shutdowns(size)
+ up_count = self._nodes_up(size)
+ booting_count = self._nodes_booting(size)
+ unpaired_count = self._nodes_unpaired(size)
+ paired_count = self._nodes_paired(size)
busy_count = self._nodes_busy(size)
- idle_count = self._nodes_up(size) - (busy_count + self._nodes_missing(size))
+ down_count = self._nodes_down(size)
+ idle_count = paired_count - (busy_count+down_count)
+ shutdown_count = self._size_shutdowns(size)
- self._logger.info("%s: wishlist %i, up %i (booting %i, idle %i, busy %i), shutting down %i", size.name,
+ self._logger.info("%s: wishlist %i, up %i (booting %i, unpaired %i, idle %i, busy %i), down %i, shutdown %i", size.name,
self._size_wishlist(size),
- idle_count + busy_count,
+ up_count,
booting_count,
- idle_count - booting_count,
+ unpaired_count,
+ idle_count,
busy_count,
+ down_count,
shutdown_count)
- wanted = self._size_wishlist(size) - idle_count
+ wanted = self._size_wishlist(size) - (up_count - busy_count)
if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price):
can_boot = int((self.max_total_price - total_price) / size.price)
if can_boot == 0:
@@ -323,10 +330,10 @@ class NodeManagerDaemonActor(actor_class):
return wanted
def _nodes_excess(self, size):
- up_count = (self._nodes_booting(size) + self._nodes_booted(size)) - self._size_shutdowns(size)
+ up_count = self._nodes_up(size)
if size.id == self.min_cloud_size.id:
up_count -= self.min_nodes
- return up_count - self._nodes_busy(size) - self._size_wishlist(size)
+ return up_count - (self._nodes_busy(size) + self._size_wishlist(size))
def update_server_wishlist(self, wishlist):
self._update_poll_time('server_wishlist')
@@ -363,17 +370,23 @@ class NodeManagerDaemonActor(actor_class):
nodes_wanted = self._nodes_wanted(cloud_size)
if nodes_wanted < 1:
return None
- arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
self._logger.info("Want %i more %s nodes. Booting a node.",
nodes_wanted, cloud_size.name)
+
+ arvados_client=self._new_arvados()
+ arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
+ if not arvados_node:
+ arvados_node = arvados_client.nodes().create(body={}).execute()
+ self._register_arvados_node(arvados_node["uuid"], arvados_node)
+
new_setup = self._node_setup.start(
timer_actor=self._timer,
- arvados_client=self._new_arvados(),
+ arvados_client=arvados_client,
arvados_node=arvados_node,
cloud_client=self._new_cloud(),
cloud_size=cloud_size).proxy()
- self.booting[new_setup.actor_ref.actor_urn] = new_setup
- self.sizes_booting_shutdown[new_setup.actor_ref.actor_urn] = cloud_size
+ self.booting[arvados_node['uuid']] = new_setup
+ self.sizes_booting_shutdown[arvados_node['uuid']] = cloud_size
if arvados_node is not None:
self.arvados_nodes[arvados_node['uuid']].assignment_time = (
@@ -386,18 +399,20 @@ class NodeManagerDaemonActor(actor_class):
return pykka.get_all([getattr(actor, name) for name in attr_names])
def node_up(self, setup_proxy):
- cloud_node = setup_proxy.cloud_node.get()
- del self.booting[setup_proxy.actor_ref.actor_urn]
- del self.sizes_booting_shutdown[setup_proxy.actor_ref.actor_urn]
-
+ # Called when a SetupActor has completed.
+ cloud_node, arvados_node = self._get_actor_attrs(
+ setup_proxy, 'cloud_node', 'arvados_node')
setup_proxy.stop()
+
+ # If cloud_node is None then the node create wasn't
+ # successful and so there isn't anything to do.
if cloud_node is not None:
- record = self.cloud_nodes.get(cloud_node.id)
- if record is None:
- record = self._new_node(cloud_node)
- self.booted[cloud_node.id] = record
- self._timer.schedule(time.time() + self.boot_fail_after,
- self._later.shutdown_unpaired_node, cloud_node.id)
+ # Node creation succeeded. Update cloud node list.
+ self._register_cloud_node(cloud_node)
+ arvuuid = arvados_node["uuid"]
+ if arvuuid in self.booting:
+ del self.booting[arvuuid]
+ del self.sizes_booting_shutdown[arvuuid]
@_check_poll_freshness
def stop_booting_node(self, size):
@@ -405,7 +420,7 @@ class NodeManagerDaemonActor(actor_class):
if (nodes_excess < 1) or not self.booting:
return None
for key, node in self.booting.iteritems():
- if node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get():
+ if node and node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get():
del self.booting[key]
del self.sizes_booting_shutdown[key]
@@ -430,29 +445,29 @@ class NodeManagerDaemonActor(actor_class):
def node_can_shutdown(self, node_actor):
if self._nodes_excess(node_actor.cloud_node.get().size) > 0:
self._begin_node_shutdown(node_actor, cancellable=True)
-
- def shutdown_unpaired_node(self, cloud_node_id):
- for record_dict in [self.cloud_nodes, self.booted]:
- if cloud_node_id in record_dict:
- record = record_dict[cloud_node_id]
- break
- else:
- return None
- if not record.actor.in_state('idle', 'busy').get():
- self._begin_node_shutdown(record.actor, cancellable=False)
+ elif self.cloud_nodes.nodes.get(node_actor.cloud_node.get().id).arvados_node is None:
+ # Node is unpaired, which means it probably exceeded its booting
+ # grace period without a ping, so shut it down so we can boot a new
+ # node in its place.
+ self._begin_node_shutdown(node_actor, cancellable=False)
+ elif node_actor.in_state('down'):
+ # Node is down and unlikely to come back.
+ self._begin_node_shutdown(node_actor, cancellable=False)
def node_finished_shutdown(self, shutdown_actor):
cloud_node, success, cancel_reason = self._get_actor_attrs(
shutdown_actor, 'cloud_node', 'success', 'cancel_reason')
- shutdown_actor.stop()
cloud_node_id = cloud_node.id
+ shutdown_actor.stop()
if not success:
if cancel_reason == self._node_shutdown.NODE_BROKEN:
self.cloud_nodes.blacklist(cloud_node_id)
- elif cloud_node_id in self.booted:
- self.booted.pop(cloud_node_id).actor.stop()
- del self.shutdowns[cloud_node_id]
- del self.sizes_booting_shutdown[cloud_node_id]
+ del self.shutdowns[cloud_node_id]
+ del self.sizes_booting_shutdown[cloud_node_id]
+ # On success, we want to leave the entry in self.shutdowns so that it
+ # won't try to shut down the node again. It should disappear from the
+ # cloud node list, and the entry in self.shutdowns will get cleaned up
+ # by update_cloud_nodes.
def shutdown(self):
self._logger.info("Shutting down after signal.")
diff --git a/services/nodemanager/tests/test_daemon.py b/services/nodemanager/tests/test_daemon.py
index d52cdae..3cc3e78 100644
--- a/services/nodemanager/tests/test_daemon.py
+++ b/services/nodemanager/tests/test_daemon.py
@@ -55,6 +55,11 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
for name in ['cloud_nodes', 'arvados_nodes', 'server_wishlist']:
setattr(self, name + '_poller', mock.MagicMock(name=name + '_mock'))
self.arv_factory = mock.MagicMock(name='arvados_mock')
+ api_client = mock.MagicMock(name='api_client')
+ api_client.nodes().create().execute.side_effect = [testutil.arvados_node_mock(1),
+ testutil.arvados_node_mock(2)]
+ self.arv_factory.return_value = api_client
+
self.cloud_factory = mock.MagicMock(name='cloud_mock')
self.cloud_factory().node_start_time.return_value = time.time()
self.cloud_updates = mock.MagicMock(name='updates_mock')
@@ -76,10 +81,10 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
min_nodes, max_nodes, 600, 1800, 3600,
self.node_setup, self.node_shutdown,
max_total_price=max_total_price).proxy()
- if cloud_nodes is not None:
- self.daemon.update_cloud_nodes(cloud_nodes).get(self.TIMEOUT)
if arvados_nodes is not None:
self.daemon.update_arvados_nodes(arvados_nodes).get(self.TIMEOUT)
+ if cloud_nodes is not None:
+ self.daemon.update_cloud_nodes(cloud_nodes).get(self.TIMEOUT)
if want_sizes is not None:
self.daemon.update_server_wishlist(want_sizes).get(self.TIMEOUT)
@@ -167,7 +172,8 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(1),
testutil.cloud_node_mock(2)],
arvados_nodes=[testutil.arvados_node_mock(1),
- testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
+ testutil.arvados_node_mock(2,
+ last_ping_at='1970-01-01T01:02:03.04050607Z')],
want_sizes=[size, size])
self.stop_proxy(self.daemon)
self.assertTrue(self.node_setup.start.called)
@@ -351,6 +357,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
shutdown = self.node_shutdown.start().proxy()
shutdown.cloud_node.get.return_value = cloud_node
self.daemon.node_finished_shutdown(shutdown).get(self.TIMEOUT)
+ self.daemon.update_cloud_nodes([])
self.assertTrue(shutdown.stop.called,
"shutdown actor not stopped after finishing")
self.assertTrue(monitor.actor_ref.actor_stopped.wait(self.TIMEOUT),
@@ -363,20 +370,25 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
def test_booted_node_shut_down_when_never_listed(self):
setup = self.start_node_boot()
+ self.cloud_factory().node_start_time.return_value = time.time() - 3601
self.daemon.node_up(setup).get(self.TIMEOUT)
self.assertEqual(1, self.alive_monitor_count())
self.assertFalse(self.node_shutdown.start.called)
- self.timer.deliver()
+ now = time.time()
+ self.monitor_list()[0].tell_proxy().consider_shutdown()
+ self.busywait(lambda: self.node_shutdown.start.called)
self.stop_proxy(self.daemon)
self.assertShutdownCancellable(False)
def test_booted_node_shut_down_when_never_paired(self):
cloud_node = testutil.cloud_node_mock(2)
setup = self.start_node_boot(cloud_node)
+ self.cloud_factory().node_start_time.return_value = time.time() - 3601
self.daemon.node_up(setup).get(self.TIMEOUT)
self.assertEqual(1, self.alive_monitor_count())
self.daemon.update_cloud_nodes([cloud_node])
- self.timer.deliver()
+ self.monitor_list()[0].tell_proxy().consider_shutdown()
+ self.busywait(lambda: self.node_shutdown.start.called)
self.stop_proxy(self.daemon)
self.assertShutdownCancellable(False)
@@ -384,11 +396,12 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
cloud_node = testutil.cloud_node_mock(4)
arv_node = testutil.arvados_node_mock(4, crunch_worker_state='down')
setup = self.start_node_boot(cloud_node, arv_node)
+ self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
self.daemon.node_up(setup).get(self.TIMEOUT)
self.assertEqual(1, self.alive_monitor_count())
+ self.monitor_list()[0].proxy().cloud_node_start_time = time.time()-3601
self.daemon.update_cloud_nodes([cloud_node])
- self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
- self.timer.deliver()
+ self.busywait(lambda: self.node_shutdown.start.called)
self.stop_proxy(self.daemon)
self.assertShutdownCancellable(False)
commit e6b3755e7447ad14026ce79418e1c2c7f0ef62d1
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed May 11 16:55:00 2016 -0400
9161: There's a window between when a node pings for the first time and the
value of 'slurm_state' is synchronized by crunch-dispatch. In this window, the
node will still report as 'down'. Check first_ping_at and implement a grace
period where the node should will be considered 'idle'.
diff --git a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
index 412a5d7..0d63dc9 100644
--- a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
+++ b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
@@ -350,6 +350,16 @@ class ComputeNodeMonitorActor(config.actor_class):
state = self.arvados_node['crunch_worker_state']
if not state:
return None
+
+ # There's a window between when a node pings for the first time and the
+ # value of 'slurm_state' is synchronized by crunch-dispatch. In this
+ # window, the node will still report as 'down'. Check first_ping_at
+ # and implement a grace period where the node should will be considered
+ # 'idle'.
+ if state == 'down' and timestamp_fresh(
+ arvados_timestamp(self.arvados_node['first_ping_at']), self.poll_stale_after):
+ state = 'idle'
+
result = state in states
if state == 'idle':
result = result and not self.arvados_node['job_uuid']
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list