[ARVADOS] created: f33920ae83ed990135a00e96f7ce8b6d8c2f06bd

Git user git at public.curoverse.com
Tue Jun 6 15:08:53 EDT 2017


        at  f33920ae83ed990135a00e96f7ce8b6d8c2f06bd (commit)


commit f33920ae83ed990135a00e96f7ce8b6d8c2f06bd
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Tue Jun 6 14:05:44 2017 -0400

    10847: Simplify and consolodate retry for API throttling errors.
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curoverse.com>

diff --git a/services/nodemanager/arvnodeman/computenode/__init__.py b/services/nodemanager/arvnodeman/computenode/__init__.py
index 20b274b..7cf9d63 100644
--- a/services/nodemanager/arvnodeman/computenode/__init__.py
+++ b/services/nodemanager/arvnodeman/computenode/__init__.py
@@ -8,6 +8,8 @@ import itertools
 import re
 import time
 
+from libcloud.common.exceptions import BaseHTTPError
+
 ARVADOS_TIMEFMT = '%Y-%m-%dT%H:%M:%SZ'
 ARVADOS_TIMESUBSEC_RE = re.compile(r'(\.\d+)Z$')
 
@@ -73,11 +75,24 @@ class RetryMixin(object):
                     try:
                         ret = orig_func(self, *args, **kwargs)
                     except Exception as error:
-                        if not (isinstance(error, errors) or
-                                self._cloud.is_cloud_exception(error)):
+                        should_retry = False
+
+                        if isinstance(error, BaseHTTPError):
+                            if error.headers and error.headers.get("retry-after"):
+                                try:
+                                    self.retry_wait = int(error.headers["retry-after"])
+                                    should_retry = True
+                                except ValueError:
+                                    pass
+                            if error.code == 429 or error.code >= 500:
+                                should_retry = True
+                        elif isinstance(error, errors):
+                            should_retry = True
+
+                        if not should_retry:
                             self.retry_wait = self.min_retry_wait
                             self._logger.warning(
-                                "Re-raising unknown error (no retry): %s",
+                                "Re-raising error (no retry): %s",
                                 error, exc_info=error)
                             raise
 
diff --git a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
index 63dac3f..8a397dc 100644
--- a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
+++ b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
@@ -8,6 +8,8 @@ import time
 import re
 
 import libcloud.common.types as cloud_types
+from libcloud.common.exceptions import BaseHTTPError
+
 import pykka
 
 from .. import \
@@ -126,7 +128,12 @@ class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
         try:
             self.cloud_node = self._cloud.create_node(self.cloud_size,
                                                       self.arvados_node)
-        except Exception as e:
+        except BaseHTTPError as e:
+            if e.code == 429 or "RequestLimitExceeded" in e.message:
+                # Don't consider API rate limits to be quota errors.
+                # re-raise so the Retry logic applies.
+                raise
+
             # The set of possible error codes / messages isn't documented for
             # all clouds, so use a keyword heuristic to determine if the
             # failure is likely due to a quota.
@@ -136,7 +143,10 @@ class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
                 self._finished()
                 return
             else:
+                # Something else happened, re-raise so the Retry logic applies.
                 raise
+        except Exception as e:
+            raise
 
         # The information included in the node size object we get from libcloud
         # is inconsistent between cloud drivers.  Replace libcloud NodeSize
@@ -272,7 +282,7 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
         self._finished(success_flag=True)
 
 
-class ComputeNodeUpdateActor(config.actor_class):
+class ComputeNodeUpdateActor(config.actor_class, RetryMixin):
     """Actor to dispatch one-off cloud management requests.
 
     This actor receives requests for small cloud updates, and
@@ -281,12 +291,11 @@ class ComputeNodeUpdateActor(config.actor_class):
     dedicated actor for this gives us the opportunity to control the
     flow of requests; e.g., by backing off when errors occur.
     """
-    def __init__(self, cloud_factory, max_retry_wait=180):
+    def __init__(self, cloud_factory, timer_actor, max_retry_wait=180):
         super(ComputeNodeUpdateActor, self).__init__()
+        RetryMixin.__init__(self, 1, max_retry_wait,
+                            None, cloud_factory(), timer_actor)
         self._cloud = cloud_factory()
-        self.max_retry_wait = max_retry_wait
-        self.error_streak = 0
-        self.next_request_time = time.time()
 
     def _set_logger(self):
         self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
@@ -294,28 +303,7 @@ class ComputeNodeUpdateActor(config.actor_class):
     def on_start(self):
         self._set_logger()
 
-    def _throttle_errors(orig_func):
-        @functools.wraps(orig_func)
-        def throttle_wrapper(self, *args, **kwargs):
-            throttle_time = self.next_request_time - time.time()
-            if throttle_time > 0:
-                time.sleep(throttle_time)
-            self.next_request_time = time.time()
-            try:
-                result = orig_func(self, *args, **kwargs)
-            except Exception as error:
-                if self._cloud.is_cloud_exception(error):
-                    self.error_streak += 1
-                    self.next_request_time += min(2 ** self.error_streak,
-                                                  self.max_retry_wait)
-                self._logger.warn(
-                    "Unhandled exception: %s", error, exc_info=error)
-            else:
-                self.error_streak = 0
-                return result
-        return throttle_wrapper
-
-    @_throttle_errors
+    @RetryMixin._retry()
     def sync_node(self, cloud_node, arvados_node):
         return self._cloud.sync_node(cloud_node, arvados_node)
 
diff --git a/services/nodemanager/arvnodeman/computenode/driver/__init__.py b/services/nodemanager/arvnodeman/computenode/driver/__init__.py
index 4420341..6d23c2b 100644
--- a/services/nodemanager/arvnodeman/computenode/driver/__init__.py
+++ b/services/nodemanager/arvnodeman/computenode/driver/__init__.py
@@ -25,7 +25,7 @@ class BaseComputeNodeDriver(RetryMixin):
     Subclasses must implement arvados_create_kwargs, sync_node,
     node_fqdn, and node_start_time.
     """
-    CLOUD_ERRORS = NETWORK_ERRORS + (cloud_types.LibcloudError, BaseHTTPError)
+    CLOUD_ERRORS = NETWORK_ERRORS + (cloud_types.LibcloudError,)
 
     @RetryMixin._retry()
     def _create_driver(self, driver_class, **auth_kwargs):
@@ -206,14 +206,6 @@ class BaseComputeNodeDriver(RetryMixin):
         # seconds since the epoch UTC.
         raise NotImplementedError("BaseComputeNodeDriver.node_start_time")
 
-    @classmethod
-    def is_cloud_exception(cls, exception):
-        # libcloud compute drivers typically raise bare Exceptions to
-        # represent API errors.  Return True for any exception that is
-        # exactly an Exception, or a better-known higher-level exception.
-        return (isinstance(exception, cls.CLOUD_ERRORS) or
-                type(exception) is Exception)
-
     def destroy_node(self, cloud_node):
         try:
             return self.real.destroy_node(cloud_node)
diff --git a/services/nodemanager/arvnodeman/computenode/driver/azure.py b/services/nodemanager/arvnodeman/computenode/driver/azure.py
index e293d1b..6e7392a 100644
--- a/services/nodemanager/arvnodeman/computenode/driver/azure.py
+++ b/services/nodemanager/arvnodeman/computenode/driver/azure.py
@@ -17,7 +17,7 @@ class ComputeNodeDriver(BaseComputeNodeDriver):
 
     DEFAULT_DRIVER = cloud_provider.get_driver(cloud_types.Provider.AZURE_ARM)
     SEARCH_CACHE = {}
-    CLOUD_ERRORS = BaseComputeNodeDriver.CLOUD_ERRORS + (BaseHTTPError,)
+    CLOUD_ERRORS = BaseComputeNodeDriver.CLOUD_ERRORS
 
     def __init__(self, auth_kwargs, list_kwargs, create_kwargs,
                  driver_class=DEFAULT_DRIVER):
diff --git a/services/nodemanager/arvnodeman/launcher.py b/services/nodemanager/arvnodeman/launcher.py
index 3cd097a..72a285b 100644
--- a/services/nodemanager/arvnodeman/launcher.py
+++ b/services/nodemanager/arvnodeman/launcher.py
@@ -128,7 +128,7 @@ def main(args=None):
         server_calculator = build_server_calculator(config)
         timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
             launch_pollers(config, server_calculator)
-        cloud_node_updater = node_update.start(config.new_cloud_client).tell_proxy()
+        cloud_node_updater = node_update.start(config.new_cloud_client, timer).tell_proxy()
         node_daemon = NodeManagerDaemonActor.start(
             job_queue_poller, arvados_node_poller, cloud_node_poller,
             cloud_node_updater, timer,
diff --git a/services/nodemanager/arvnodeman/test/fake_driver.py b/services/nodemanager/arvnodeman/test/fake_driver.py
index 1785e05..8e7cf2f 100644
--- a/services/nodemanager/arvnodeman/test/fake_driver.py
+++ b/services/nodemanager/arvnodeman/test/fake_driver.py
@@ -97,3 +97,31 @@ class FailingDriver(FakeDriver):
                     ex_tags=None,
                     ex_network=None):
         raise Exception("nope")
+
+class RetryDriver(FakeDriver):
+    def create_node(self, name=None,
+                    size=None,
+                    image=None,
+                    auth=None,
+                    ex_storage_account=None,
+                    ex_customdata=None,
+                    ex_resource_group=None,
+                    ex_user_name=None,
+                    ex_tags=None,
+                    ex_network=None):
+        global create_calls
+        create_calls += 1
+        if create_calls < 2:
+            raise BaseHTTPError(429, "Rate limit exceeded",
+                                {'retry-after': '12'})
+        else:
+            return super(RetryDriver, self).create_node(name=name,
+                    size=size,
+                    image=image,
+                    auth=auth,
+                    ex_storage_account=ex_storage_account,
+                    ex_customdata=ex_customdata,
+                    ex_resource_group=ex_resource_group,
+                    ex_user_name=ex_user_name,
+                    ex_tags=ex_tags,
+                    ex_network=ex_network)
diff --git a/services/nodemanager/tests/integration_test.py b/services/nodemanager/tests/integration_test.py
index cb33880..f945c7a 100755
--- a/services/nodemanager/tests/integration_test.py
+++ b/services/nodemanager/tests/integration_test.py
@@ -298,6 +298,16 @@ def main():
              "34t0i-dz642-h42bg3hq4bdfpf2": "ReqNodeNotAvail",
              "34t0i-dz642-h42bg3hq4bdfpf3": "ReqNodeNotAvail",
              "34t0i-dz642-h42bg3hq4bdfpf4": "ReqNodeNotAvail"
+         }),
+        "test6": (
+            [
+                (r".*Daemon started", set_squeue),
+                (r".*Rate limit exceeded - scheduling retry in 12 seconds", noop),
+                (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", noop),
+            ],
+            {},
+            "arvnodeman.test.fake_driver.RetryDriver",
+            {"34t0i-dz642-h42bg3hq4bdfpf1": "ReqNodeNotAvail"
          })
     }
 

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list