[ARVADOS] updated: 4aaa1f6aeae33dc353ea3a70f901065b4f694fed
Git user
git at public.curoverse.com
Thu Jun 16 19:08:05 EDT 2016
Summary of changes:
sdk/python/arvados/keep.py | 26 +++++++++++---
sdk/python/tests/test_keep_client.py | 69 ++++++++++++++++++++++++++++++------
2 files changed, 81 insertions(+), 14 deletions(-)
via 4aaa1f6aeae33dc353ea3a70f901065b4f694fed (commit)
from 5862c01a952d410b1237dc2a4a86cb844652a8d0 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 4aaa1f6aeae33dc353ea3a70f901065b4f694fed
Author: Lucas Di Pentima <lucas at curoverse.com>
Date: Thu Jun 16 20:06:21 2016 -0300
9180: Changed some of the logic on ThreadLimiter and made unit tests to validate the new behaviour
refs #9180
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 3c0ad6f..9e9fb00 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -251,6 +251,7 @@ class KeepClient(object):
self._started = 0
self._want_copies = want_copies
self._done = 0
+ self._thread_failures = 0
self._response = None
self._start_lock = threading.Condition()
if (not max_service_replicas) or (max_service_replicas >= want_copies):
@@ -260,6 +261,7 @@ class KeepClient(object):
_logger.debug("Limiter max threads is %d", max_threads)
self._todo_lock = threading.Semaphore(max_threads)
self._done_lock = threading.Lock()
+ self._thread_failures_lock = threading.Lock()
self._local = threading.local()
def __enter__(self):
@@ -276,7 +278,14 @@ class KeepClient(object):
return self
def __exit__(self, type, value, traceback):
- self._todo_lock.release()
+ with self._thread_failures_lock:
+ if self._thread_failures > 0:
+ self._thread_failures -= 1
+ self._todo_lock.release()
+
+ # If work is finished, release al pending threads
+ if not self.shall_i_proceed():
+ self._todo_lock.release()
def set_sequence(self, sequence):
self._local.sequence = sequence
@@ -294,9 +303,14 @@ class KeepClient(object):
Records a response body (a locator, possibly signed) returned by
the Keep server, and the number of replicas it stored.
"""
- with self._done_lock:
- self._done += replicas_stored
- self._response = response_body
+ if replicas_stored == 0:
+ # Failure notification, should start a new thread to try to reach full replication
+ with self._thread_failures_lock:
+ self._thread_failures += 1
+ else:
+ with self._done_lock:
+ self._done += replicas_stored
+ self._response = response_body
def response(self):
"""Return the body from the response to a PUT request."""
@@ -612,6 +626,10 @@ class KeepClient(object):
self.args['data_hash'],
result['status_code'],
result['body'])
+ if not self._success:
+ # Notify the failure so that the Thread limiter allows
+ # a new one to run.
+ limiter.save_response(None, 0)
def __init__(self, api_client=None, proxy=None,
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index eff344c..33b96fe 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -1064,18 +1064,67 @@ class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase):
with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 200):
self.check_exception(copies=2, num_retries=3)
-# @tutil.skip_sleep
+
class KeepClientAvoidClientOverreplicationTestCase(unittest.TestCase, tutil.ApiClientMock):
- def setUp(self):
- self.api_client = self.mock_keep_services(count=4)
- self.keep_client = arvados.KeepClient(api_client=self.api_client)
- def test_only_send_enough_on_success(self):
- with tutil.mock_keep_responses(
- 'acbd18db4cc2f85cedef654fccc4a4d8+3',
- 200, 200, 200, 200) as req_mock:
- self.keep_client.put('foo', num_retries=1, copies=2)
- self.assertEqual(3, req_mock.call_count)
+
+ class KeepFakeWriterThread(threading.Thread):
+ """
+ Just Simulating the real KeepClient.KeepWriterThread, to test the ThreadLimiter.
+ """
+ def __init__(self, delay, will_succeed, thread_limiter):
+ super(KeepClientAvoidClientOverreplicationTestCase.KeepFakeWriterThread, self).__init__()
+ self.delay = delay # in seconds
+ self.success = will_succeed
+ self.limiter = thread_limiter
+
+ def run(self):
+ with self.limiter:
+ if not self.limiter.shall_i_proceed():
+ return
+ time.sleep(self.delay)
+ if self.success:
+ self.limiter.save_response('foo', 1)
+ else:
+ self.limiter.save_response(None, 0)
+
+ def test_only_write_enough_on_success(self):
+ copies = 3
+ threads = []
+ limiter = arvados.KeepClient.ThreadLimiter(want_copies=copies, max_service_replicas=1)
+ # Setting up fake writer threads with different delays so that the bug is revealed
+ for i in range(copies*2):
+ t = self.KeepFakeWriterThread(
+ delay=i/10.0,
+ will_succeed=True,
+ thread_limiter=limiter)
+ t.start()
+ threads.append(t)
+ for t in threads:
+ t.join()
+ self.assertEqual(limiter.done(), copies)
+
+ def test_only_write_enough_on_partial_failure(self):
+ copies = 3
+ threads = []
+ limiter = arvados.KeepClient.ThreadLimiter(want_copies=copies, max_service_replicas=1)
+ for i in range(copies):
+ t = self.KeepFakeWriterThread(
+ delay=i/10.0,
+ will_succeed=False,
+ thread_limiter=limiter)
+ t.start()
+ threads.append(t)
+ t = self.KeepFakeWriterThread(
+ delay=i/10.0,
+ will_succeed=True,
+ thread_limiter=limiter)
+ t.start()
+ threads.append(t)
+ for t in threads:
+ t.join()
+ self.assertEqual(limiter.done(), copies)
+
@tutil.skip_sleep
class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock):
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list