[ARVADOS] updated: 4aaa1f6aeae33dc353ea3a70f901065b4f694fed

Git user git at public.curoverse.com
Thu Jun 16 19:08:05 EDT 2016


Summary of changes:
 sdk/python/arvados/keep.py           | 26 +++++++++++---
 sdk/python/tests/test_keep_client.py | 69 ++++++++++++++++++++++++++++++------
 2 files changed, 81 insertions(+), 14 deletions(-)

       via  4aaa1f6aeae33dc353ea3a70f901065b4f694fed (commit)
      from  5862c01a952d410b1237dc2a4a86cb844652a8d0 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 4aaa1f6aeae33dc353ea3a70f901065b4f694fed
Author: Lucas Di Pentima <lucas at curoverse.com>
Date:   Thu Jun 16 20:06:21 2016 -0300

    9180: Changed some of the logic on ThreadLimiter and made unit tests to validate the new behaviour
    refs #9180

diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 3c0ad6f..9e9fb00 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -251,6 +251,7 @@ class KeepClient(object):
             self._started = 0
             self._want_copies = want_copies
             self._done = 0
+            self._thread_failures = 0
             self._response = None
             self._start_lock = threading.Condition()
             if (not max_service_replicas) or (max_service_replicas >= want_copies):
@@ -260,6 +261,7 @@ class KeepClient(object):
             _logger.debug("Limiter max threads is %d", max_threads)
             self._todo_lock = threading.Semaphore(max_threads)
             self._done_lock = threading.Lock()
+            self._thread_failures_lock = threading.Lock()
             self._local = threading.local()
 
         def __enter__(self):
@@ -276,7 +278,14 @@ class KeepClient(object):
             return self
 
         def __exit__(self, type, value, traceback):
-            self._todo_lock.release()
+            with self._thread_failures_lock:
+                if self._thread_failures > 0:
+                    self._thread_failures -= 1
+                    self._todo_lock.release()
+
+            # If work is finished, release al pending threads
+            if not self.shall_i_proceed():
+                self._todo_lock.release()
 
         def set_sequence(self, sequence):
             self._local.sequence = sequence
@@ -294,9 +303,14 @@ class KeepClient(object):
             Records a response body (a locator, possibly signed) returned by
             the Keep server, and the number of replicas it stored.
             """
-            with self._done_lock:
-                self._done += replicas_stored
-                self._response = response_body
+            if replicas_stored == 0:
+                # Failure notification, should start a new thread to try to reach full replication
+                with self._thread_failures_lock:
+                    self._thread_failures += 1
+            else:
+                with self._done_lock:
+                    self._done += replicas_stored
+                    self._response = response_body
 
         def response(self):
             """Return the body from the response to a PUT request."""
@@ -612,6 +626,10 @@ class KeepClient(object):
                               self.args['data_hash'],
                               result['status_code'],
                               result['body'])
+            if not self._success:
+                # Notify the failure so that the Thread limiter allows
+                # a new one to run.
+                limiter.save_response(None, 0)
 
 
     def __init__(self, api_client=None, proxy=None,
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index eff344c..33b96fe 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -1064,18 +1064,67 @@ class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase):
         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 200):
             self.check_exception(copies=2, num_retries=3)
 
-# @tutil.skip_sleep
+
 class KeepClientAvoidClientOverreplicationTestCase(unittest.TestCase, tutil.ApiClientMock):
-    def setUp(self):
-        self.api_client = self.mock_keep_services(count=4)
-        self.keep_client = arvados.KeepClient(api_client=self.api_client)
 
-    def test_only_send_enough_on_success(self):
-        with tutil.mock_keep_responses(
-                'acbd18db4cc2f85cedef654fccc4a4d8+3',
-                200, 200, 200, 200) as req_mock:
-            self.keep_client.put('foo', num_retries=1, copies=2)
-        self.assertEqual(3, req_mock.call_count)
+
+    class KeepFakeWriterThread(threading.Thread):
+        """
+        Just Simulating the real KeepClient.KeepWriterThread, to test the ThreadLimiter.
+        """
+        def __init__(self, delay, will_succeed, thread_limiter):
+            super(KeepClientAvoidClientOverreplicationTestCase.KeepFakeWriterThread, self).__init__()
+            self.delay = delay # in seconds
+            self.success = will_succeed
+            self.limiter = thread_limiter
+
+        def run(self):
+            with self.limiter:
+                if not self.limiter.shall_i_proceed():
+                    return
+                time.sleep(self.delay)
+                if self.success:
+                    self.limiter.save_response('foo', 1)
+                else:
+                    self.limiter.save_response(None, 0)
+
+    def test_only_write_enough_on_success(self):
+        copies = 3
+        threads = []
+        limiter = arvados.KeepClient.ThreadLimiter(want_copies=copies, max_service_replicas=1)
+        # Setting up fake writer threads with different delays so that the bug is revealed
+        for i in range(copies*2):
+            t = self.KeepFakeWriterThread(
+                    delay=i/10.0,
+                    will_succeed=True,
+                    thread_limiter=limiter)
+            t.start()
+            threads.append(t)
+        for t in threads:
+            t.join()
+        self.assertEqual(limiter.done(), copies)
+
+    def test_only_write_enough_on_partial_failure(self):
+        copies = 3
+        threads = []
+        limiter = arvados.KeepClient.ThreadLimiter(want_copies=copies, max_service_replicas=1)
+        for i in range(copies):
+            t = self.KeepFakeWriterThread(
+                    delay=i/10.0,
+                    will_succeed=False,
+                    thread_limiter=limiter)
+            t.start()
+            threads.append(t)
+            t = self.KeepFakeWriterThread(
+                    delay=i/10.0,
+                    will_succeed=True,
+                    thread_limiter=limiter)
+            t.start()
+            threads.append(t)
+        for t in threads:
+            t.join()
+        self.assertEqual(limiter.done(), copies)
+
 
 @tutil.skip_sleep
 class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock):

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list