[ARVADOS] created: 9362030c75c09e363f95dcf742f79570107928d1

git at public.curoverse.com git at public.curoverse.com
Tue Oct 21 14:31:00 EDT 2014


        at  9362030c75c09e363f95dcf742f79570107928d1 (commit)


commit 9362030c75c09e363f95dcf742f79570107928d1
Author: Brett Smith <brett at curoverse.com>
Date:   Tue Oct 21 14:31:44 2014 -0400

    4139: Improve scheduling of Node Manager polls.
    
    * Catch all exceptions, so that we keep polling no matter what
      happens.  Use CLIENT_ERRORS as a hint about how much logging we
      need.
    * Make the next poll time calculation a little less stateful and
      easier to follow.
    * Add tests for poll scheduling.

diff --git a/services/nodemanager/arvnodeman/clientactor.py b/services/nodemanager/arvnodeman/clientactor.py
index d6b220f..46a103e 100644
--- a/services/nodemanager/arvnodeman/clientactor.py
+++ b/services/nodemanager/arvnodeman/clientactor.py
@@ -34,25 +34,27 @@ class RemotePollLoopActor(actor_class):
     If you also define an _item_key method, this class will support
     subscribing to a specific item by key in responses.
     """
+    CLIENT_ERRORS = ()
+
     def __init__(self, client, timer_actor, poll_wait=60, max_poll_wait=180):
         super(RemotePollLoopActor, self).__init__()
         self._client = client
         self._timer = timer_actor
         self._logger = logging.getLogger(self.LOGGER_NAME)
         self._later = self.actor_ref.proxy()
+        self._polling_started = False
         self.log_prefix = "{} (at {})".format(self.__class__.__name__, id(self))
         self.min_poll_wait = poll_wait
         self.max_poll_wait = max_poll_wait
         self.poll_wait = self.min_poll_wait
-        self.last_poll_time = None
         self.all_subscribers = set()
         self.key_subscribers = {}
         if hasattr(self, '_item_key'):
             self.subscribe_to = self._subscribe_to
 
     def _start_polling(self):
-        if self.last_poll_time is None:
-            self.last_poll_time = time.time()
+        if not self._polling_started:
+            self._polling_started = True
             self._later.poll()
 
     def subscribe(self, subscriber):
@@ -82,19 +84,27 @@ class RemotePollLoopActor(actor_class):
 
     def _got_error(self, error):
         self.poll_wait = min(self.poll_wait * 2, self.max_poll_wait)
-        self._logger.warning("%s got error: %s - waiting %s seconds",
-                             self.log_prefix, error, self.poll_wait)
+        return "{} got error: {} - waiting {} seconds".format(
+            self.log_prefix, error, self.poll_wait)
 
-    def poll(self):
+    def poll(self, scheduled_start=None):
         self._logger.debug("%s sending poll", self.log_prefix)
         start_time = time.time()
+        if scheduled_start is None:
+            scheduled_start = start_time
         try:
             response = self._send_request()
-        except self.CLIENT_ERRORS as error:
-            self.last_poll_time = start_time
-            self._got_error(error)
+        except Exception as error:
+            errmsg = self._got_error(error)
+            if isinstance(error, self.CLIENT_ERRORS):
+                self._logger.warning(errmsg)
+            else:
+                self._logger.exception(errmsg)
+            next_poll = start_time + self.poll_wait
         else:
-            self.last_poll_time += self.poll_wait
             self._got_response(response)
-        self._timer.schedule(self.last_poll_time + self.poll_wait,
-                             self._later.poll)
+            next_poll = scheduled_start + self.poll_wait
+        end_time = time.time()
+        if next_poll < end_time:  # We've drifted too much; start fresh.
+            next_poll = end_time + self.poll_wait
+        self._timer.schedule(next_poll, self._later.poll, next_poll)
diff --git a/services/nodemanager/tests/test_clientactor.py b/services/nodemanager/tests/test_clientactor.py
index 1e4c40e..57a0d32 100644
--- a/services/nodemanager/tests/test_clientactor.py
+++ b/services/nodemanager/tests/test_clientactor.py
@@ -64,6 +64,31 @@ class RemotePollLoopActorTestCase(testutil.RemotePollLoopActorTestMixin,
                         "poll loop died from dead subscriber")
         self.subscriber.assert_called_with('survive2')
 
+    def check_poll_timers(self, *test_times):
+        schedule_mock = self.timer.schedule
+        last_expect = None
+        with mock.patch('time.time') as time_mock:
+            for fake_time, expect_next in test_times:
+                time_mock.return_value = fake_time
+                self.monitor.poll(last_expect).get(self.TIMEOUT)
+                self.assertTrue(schedule_mock.called)
+                self.assertEqual(expect_next, schedule_mock.call_args[0][0])
+                schedule_mock.reset_mock()
+                last_expect = expect_next
+
+    def test_poll_timing_on_consecutive_successes_with_drift(self):
+        self.build_monitor(['1', '2'], poll_wait=3, max_poll_wait=14)
+        self.check_poll_timers((0, 3), (4, 6))
+
+    def test_poll_backoff_on_failures(self):
+        self.build_monitor(self.MockClientError, poll_wait=3, max_poll_wait=14)
+        self.check_poll_timers((0, 6), (6, 18), (18, 32))
+
+    def test_poll_timing_after_error_recovery(self):
+        self.build_monitor(['a', self.MockClientError(), 'b'],
+                           poll_wait=3, max_poll_wait=14)
+        self.check_poll_timers((0, 3), (4, 10), (10, 13))
+
     def test_no_subscriptions_by_key_without_support(self):
         self.build_monitor([])
         with self.assertRaises(AttributeError):

commit 4fb27993c2989648fab6fa29e2073a65b395379d
Author: Brett Smith <brett at curoverse.com>
Date:   Tue Oct 21 14:13:37 2014 -0400

    4139: Improve logging in Node Manager poll actors.

diff --git a/services/nodemanager/arvnodeman/clientactor.py b/services/nodemanager/arvnodeman/clientactor.py
index 77d85d6..d6b220f 100644
--- a/services/nodemanager/arvnodeman/clientactor.py
+++ b/services/nodemanager/arvnodeman/clientactor.py
@@ -40,6 +40,7 @@ class RemotePollLoopActor(actor_class):
         self._timer = timer_actor
         self._logger = logging.getLogger(self.LOGGER_NAME)
         self._later = self.actor_ref.proxy()
+        self.log_prefix = "{} (at {})".format(self.__class__.__name__, id(self))
         self.min_poll_wait = poll_wait
         self.max_poll_wait = max_poll_wait
         self.poll_wait = self.min_poll_wait
@@ -70,6 +71,8 @@ class RemotePollLoopActor(actor_class):
         raise NotImplementedError("subclasses must implement request method")
 
     def _got_response(self, response):
+        self._logger.debug("%s got response with %d items",
+                           self.log_prefix, len(response))
         self.poll_wait = self.min_poll_wait
         _notify_subscribers(response, self.all_subscribers)
         if hasattr(self, '_item_key'):
@@ -79,10 +82,11 @@ class RemotePollLoopActor(actor_class):
 
     def _got_error(self, error):
         self.poll_wait = min(self.poll_wait * 2, self.max_poll_wait)
-        self._logger.warning("Client error: %s - waiting %s seconds",
-                             error, self.poll_wait)
+        self._logger.warning("%s got error: %s - waiting %s seconds",
+                             self.log_prefix, error, self.poll_wait)
 
     def poll(self):
+        self._logger.debug("%s sending poll", self.log_prefix)
         start_time = time.time()
         try:
             response = self._send_request()
diff --git a/services/nodemanager/arvnodeman/jobqueue.py b/services/nodemanager/arvnodeman/jobqueue.py
index 08ee12e..0eb5b79 100644
--- a/services/nodemanager/arvnodeman/jobqueue.py
+++ b/services/nodemanager/arvnodeman/jobqueue.py
@@ -2,6 +2,8 @@
 
 from __future__ import absolute_import, print_function
 
+import logging
+
 from . import clientactor
 from .config import ARVADOS_ERRORS
 
@@ -40,7 +42,9 @@ class ServerCalculator(object):
         self.cloud_sizes = [self.CloudSizeWrapper(s, **kws)
                             for s, kws in server_list]
         self.cloud_sizes.sort(key=lambda s: s.price)
-        self.max_nodes = max_nodes or float("inf")
+        self.max_nodes = max_nodes or float('inf')
+        self.logger = logging.getLogger('arvnodeman.jobqueue')
+        self.logged_jobs = set()
 
     @staticmethod
     def coerce_int(x, fallback):
@@ -61,12 +65,19 @@ class ServerCalculator(object):
 
     def servers_for_queue(self, queue):
         servers = []
+        seen_jobs = set()
         for job in queue:
+            seen_jobs.add(job['uuid'])
             constraints = job['runtime_constraints']
             want_count = self.coerce_int(constraints.get('min_nodes'), 1)
             cloud_size = self.cloud_size_for_constraints(constraints)
-            if (want_count < self.max_nodes) and (cloud_size is not None):
+            if cloud_size is None:
+                if job['uuid'] not in self.logged_jobs:
+                    self.logged_jobs.add(job['uuid'])
+                    self.logger.debug("job %s not satisfiable", job['uuid'])
+            elif (want_count < self.max_nodes):
                 servers.extend([cloud_size.real] * max(1, want_count))
+        self.logged_jobs.intersection_update(seen_jobs)
         return servers
 
 
@@ -92,5 +103,5 @@ class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
     def _got_response(self, queue):
         server_list = self._calculator.servers_for_queue(queue)
         self._logger.debug("Sending server wishlist: %s",
-                           ', '.join(s.name for s in server_list))
+                           ', '.join(s.name for s in server_list) or "(empty)")
         return super(JobQueueMonitorActor, self)._got_response(server_list)
diff --git a/services/nodemanager/tests/test_jobqueue.py b/services/nodemanager/tests/test_jobqueue.py
index b27f69f..0a4d136 100644
--- a/services/nodemanager/tests/test_jobqueue.py
+++ b/services/nodemanager/tests/test_jobqueue.py
@@ -14,7 +14,9 @@ class ServerCalculatorTestCase(unittest.TestCase):
 
     def calculate(self, servcalc, *constraints):
         return servcalc.servers_for_queue(
-            [{'runtime_constraints': cdict} for cdict in constraints])
+            [{'uuid': 'zzzzz-jjjjj-{:015x}'.format(index),
+              'runtime_constraints': cdict}
+             for index, cdict in enumerate(constraints)])
 
     def test_empty_queue_needs_no_servers(self):
         servcalc = self.make_calculator([1])

commit 15f367505326b375b17f03e4c54a751337155df0
Author: Brett Smith <brett at curoverse.com>
Date:   Tue Oct 21 11:19:05 2014 -0400

    4139: Node Manager callback actor avoids redundant delivery calls.
    
    This helps make it more responsive when it receives a shutdown
    request.

diff --git a/services/nodemanager/arvnodeman/timedcallback.py b/services/nodemanager/arvnodeman/timedcallback.py
index a1df8ec..615f798 100644
--- a/services/nodemanager/arvnodeman/timedcallback.py
+++ b/services/nodemanager/arvnodeman/timedcallback.py
@@ -23,8 +23,9 @@ class TimedCallBackActor(actor_class):
         self.max_sleep = max_sleep
 
     def schedule(self, delivery_time, receiver, *args, **kwargs):
+        if not self.messages:
+            self._proxy.deliver()
         heapq.heappush(self.messages, (delivery_time, receiver, args, kwargs))
-        self._proxy.deliver()
 
     def deliver(self):
         if not self.messages:

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list