[ARVADOS] created: 9362030c75c09e363f95dcf742f79570107928d1
git at public.curoverse.com
git at public.curoverse.com
Tue Oct 21 14:31:00 EDT 2014
at 9362030c75c09e363f95dcf742f79570107928d1 (commit)
commit 9362030c75c09e363f95dcf742f79570107928d1
Author: Brett Smith <brett at curoverse.com>
Date: Tue Oct 21 14:31:44 2014 -0400
4139: Improve scheduling of Node Manager polls.
* Catch all exceptions, so that we keep polling no matter what
happens. Use CLIENT_ERRORS as a hint about how much logging we
need.
* Make the next poll time calculation a little less stateful and
easier to follow.
* Add tests for poll scheduling.
diff --git a/services/nodemanager/arvnodeman/clientactor.py b/services/nodemanager/arvnodeman/clientactor.py
index d6b220f..46a103e 100644
--- a/services/nodemanager/arvnodeman/clientactor.py
+++ b/services/nodemanager/arvnodeman/clientactor.py
@@ -34,25 +34,27 @@ class RemotePollLoopActor(actor_class):
If you also define an _item_key method, this class will support
subscribing to a specific item by key in responses.
"""
+ CLIENT_ERRORS = ()
+
def __init__(self, client, timer_actor, poll_wait=60, max_poll_wait=180):
super(RemotePollLoopActor, self).__init__()
self._client = client
self._timer = timer_actor
self._logger = logging.getLogger(self.LOGGER_NAME)
self._later = self.actor_ref.proxy()
+ self._polling_started = False
self.log_prefix = "{} (at {})".format(self.__class__.__name__, id(self))
self.min_poll_wait = poll_wait
self.max_poll_wait = max_poll_wait
self.poll_wait = self.min_poll_wait
- self.last_poll_time = None
self.all_subscribers = set()
self.key_subscribers = {}
if hasattr(self, '_item_key'):
self.subscribe_to = self._subscribe_to
def _start_polling(self):
- if self.last_poll_time is None:
- self.last_poll_time = time.time()
+ if not self._polling_started:
+ self._polling_started = True
self._later.poll()
def subscribe(self, subscriber):
@@ -82,19 +84,27 @@ class RemotePollLoopActor(actor_class):
def _got_error(self, error):
self.poll_wait = min(self.poll_wait * 2, self.max_poll_wait)
- self._logger.warning("%s got error: %s - waiting %s seconds",
- self.log_prefix, error, self.poll_wait)
+ return "{} got error: {} - waiting {} seconds".format(
+ self.log_prefix, error, self.poll_wait)
- def poll(self):
+ def poll(self, scheduled_start=None):
self._logger.debug("%s sending poll", self.log_prefix)
start_time = time.time()
+ if scheduled_start is None:
+ scheduled_start = start_time
try:
response = self._send_request()
- except self.CLIENT_ERRORS as error:
- self.last_poll_time = start_time
- self._got_error(error)
+ except Exception as error:
+ errmsg = self._got_error(error)
+ if isinstance(error, self.CLIENT_ERRORS):
+ self._logger.warning(errmsg)
+ else:
+ self._logger.exception(errmsg)
+ next_poll = start_time + self.poll_wait
else:
- self.last_poll_time += self.poll_wait
self._got_response(response)
- self._timer.schedule(self.last_poll_time + self.poll_wait,
- self._later.poll)
+ next_poll = scheduled_start + self.poll_wait
+ end_time = time.time()
+ if next_poll < end_time: # We've drifted too much; start fresh.
+ next_poll = end_time + self.poll_wait
+ self._timer.schedule(next_poll, self._later.poll, next_poll)
diff --git a/services/nodemanager/tests/test_clientactor.py b/services/nodemanager/tests/test_clientactor.py
index 1e4c40e..57a0d32 100644
--- a/services/nodemanager/tests/test_clientactor.py
+++ b/services/nodemanager/tests/test_clientactor.py
@@ -64,6 +64,31 @@ class RemotePollLoopActorTestCase(testutil.RemotePollLoopActorTestMixin,
"poll loop died from dead subscriber")
self.subscriber.assert_called_with('survive2')
+ def check_poll_timers(self, *test_times):
+ schedule_mock = self.timer.schedule
+ last_expect = None
+ with mock.patch('time.time') as time_mock:
+ for fake_time, expect_next in test_times:
+ time_mock.return_value = fake_time
+ self.monitor.poll(last_expect).get(self.TIMEOUT)
+ self.assertTrue(schedule_mock.called)
+ self.assertEqual(expect_next, schedule_mock.call_args[0][0])
+ schedule_mock.reset_mock()
+ last_expect = expect_next
+
+ def test_poll_timing_on_consecutive_successes_with_drift(self):
+ self.build_monitor(['1', '2'], poll_wait=3, max_poll_wait=14)
+ self.check_poll_timers((0, 3), (4, 6))
+
+ def test_poll_backoff_on_failures(self):
+ self.build_monitor(self.MockClientError, poll_wait=3, max_poll_wait=14)
+ self.check_poll_timers((0, 6), (6, 18), (18, 32))
+
+ def test_poll_timing_after_error_recovery(self):
+ self.build_monitor(['a', self.MockClientError(), 'b'],
+ poll_wait=3, max_poll_wait=14)
+ self.check_poll_timers((0, 3), (4, 10), (10, 13))
+
def test_no_subscriptions_by_key_without_support(self):
self.build_monitor([])
with self.assertRaises(AttributeError):
commit 4fb27993c2989648fab6fa29e2073a65b395379d
Author: Brett Smith <brett at curoverse.com>
Date: Tue Oct 21 14:13:37 2014 -0400
4139: Improve logging in Node Manager poll actors.
diff --git a/services/nodemanager/arvnodeman/clientactor.py b/services/nodemanager/arvnodeman/clientactor.py
index 77d85d6..d6b220f 100644
--- a/services/nodemanager/arvnodeman/clientactor.py
+++ b/services/nodemanager/arvnodeman/clientactor.py
@@ -40,6 +40,7 @@ class RemotePollLoopActor(actor_class):
self._timer = timer_actor
self._logger = logging.getLogger(self.LOGGER_NAME)
self._later = self.actor_ref.proxy()
+ self.log_prefix = "{} (at {})".format(self.__class__.__name__, id(self))
self.min_poll_wait = poll_wait
self.max_poll_wait = max_poll_wait
self.poll_wait = self.min_poll_wait
@@ -70,6 +71,8 @@ class RemotePollLoopActor(actor_class):
raise NotImplementedError("subclasses must implement request method")
def _got_response(self, response):
+ self._logger.debug("%s got response with %d items",
+ self.log_prefix, len(response))
self.poll_wait = self.min_poll_wait
_notify_subscribers(response, self.all_subscribers)
if hasattr(self, '_item_key'):
@@ -79,10 +82,11 @@ class RemotePollLoopActor(actor_class):
def _got_error(self, error):
self.poll_wait = min(self.poll_wait * 2, self.max_poll_wait)
- self._logger.warning("Client error: %s - waiting %s seconds",
- error, self.poll_wait)
+ self._logger.warning("%s got error: %s - waiting %s seconds",
+ self.log_prefix, error, self.poll_wait)
def poll(self):
+ self._logger.debug("%s sending poll", self.log_prefix)
start_time = time.time()
try:
response = self._send_request()
diff --git a/services/nodemanager/arvnodeman/jobqueue.py b/services/nodemanager/arvnodeman/jobqueue.py
index 08ee12e..0eb5b79 100644
--- a/services/nodemanager/arvnodeman/jobqueue.py
+++ b/services/nodemanager/arvnodeman/jobqueue.py
@@ -2,6 +2,8 @@
from __future__ import absolute_import, print_function
+import logging
+
from . import clientactor
from .config import ARVADOS_ERRORS
@@ -40,7 +42,9 @@ class ServerCalculator(object):
self.cloud_sizes = [self.CloudSizeWrapper(s, **kws)
for s, kws in server_list]
self.cloud_sizes.sort(key=lambda s: s.price)
- self.max_nodes = max_nodes or float("inf")
+ self.max_nodes = max_nodes or float('inf')
+ self.logger = logging.getLogger('arvnodeman.jobqueue')
+ self.logged_jobs = set()
@staticmethod
def coerce_int(x, fallback):
@@ -61,12 +65,19 @@ class ServerCalculator(object):
def servers_for_queue(self, queue):
servers = []
+ seen_jobs = set()
for job in queue:
+ seen_jobs.add(job['uuid'])
constraints = job['runtime_constraints']
want_count = self.coerce_int(constraints.get('min_nodes'), 1)
cloud_size = self.cloud_size_for_constraints(constraints)
- if (want_count < self.max_nodes) and (cloud_size is not None):
+ if cloud_size is None:
+ if job['uuid'] not in self.logged_jobs:
+ self.logged_jobs.add(job['uuid'])
+ self.logger.debug("job %s not satisfiable", job['uuid'])
+ elif (want_count < self.max_nodes):
servers.extend([cloud_size.real] * max(1, want_count))
+ self.logged_jobs.intersection_update(seen_jobs)
return servers
@@ -92,5 +103,5 @@ class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
def _got_response(self, queue):
server_list = self._calculator.servers_for_queue(queue)
self._logger.debug("Sending server wishlist: %s",
- ', '.join(s.name for s in server_list))
+ ', '.join(s.name for s in server_list) or "(empty)")
return super(JobQueueMonitorActor, self)._got_response(server_list)
diff --git a/services/nodemanager/tests/test_jobqueue.py b/services/nodemanager/tests/test_jobqueue.py
index b27f69f..0a4d136 100644
--- a/services/nodemanager/tests/test_jobqueue.py
+++ b/services/nodemanager/tests/test_jobqueue.py
@@ -14,7 +14,9 @@ class ServerCalculatorTestCase(unittest.TestCase):
def calculate(self, servcalc, *constraints):
return servcalc.servers_for_queue(
- [{'runtime_constraints': cdict} for cdict in constraints])
+ [{'uuid': 'zzzzz-jjjjj-{:015x}'.format(index),
+ 'runtime_constraints': cdict}
+ for index, cdict in enumerate(constraints)])
def test_empty_queue_needs_no_servers(self):
servcalc = self.make_calculator([1])
commit 15f367505326b375b17f03e4c54a751337155df0
Author: Brett Smith <brett at curoverse.com>
Date: Tue Oct 21 11:19:05 2014 -0400
4139: Node Manager callback actor avoids redundant delivery calls.
This helps make it more responsive when it receives a shutdown
request.
diff --git a/services/nodemanager/arvnodeman/timedcallback.py b/services/nodemanager/arvnodeman/timedcallback.py
index a1df8ec..615f798 100644
--- a/services/nodemanager/arvnodeman/timedcallback.py
+++ b/services/nodemanager/arvnodeman/timedcallback.py
@@ -23,8 +23,9 @@ class TimedCallBackActor(actor_class):
self.max_sleep = max_sleep
def schedule(self, delivery_time, receiver, *args, **kwargs):
+ if not self.messages:
+ self._proxy.deliver()
heapq.heappush(self.messages, (delivery_time, receiver, args, kwargs))
- self._proxy.deliver()
def deliver(self):
if not self.messages:
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list