[ARVADOS] updated: 7c2b38632ae2af8a2f0f5c8fa2e55523220a3335
Git user
git at public.curoverse.com
Fri Nov 25 18:10:40 EST 2016
Summary of changes:
sdk/python/arvados/keep.py | 70 ++++++++++++++++++------------------
sdk/python/tests/arvados_testutil.py | 2 --
sdk/python/tests/keepstub.py | 66 +++++-----------------------------
sdk/python/tests/test_collections.py | 69 -----------------------------------
sdk/python/tests/test_keep_client.py | 25 +++++++++----
5 files changed, 63 insertions(+), 169 deletions(-)
discards 15eb9d37a2e5fba757459db6ed992638aee3ca4a (commit)
discards 62857acf3fef2f05228ba69954d44c793bf5982d (commit)
via 7c2b38632ae2af8a2f0f5c8fa2e55523220a3335 (commit)
This update added new revisions after undoing existing revisions. That is
to say, the old revision is not a strict subset of the new revision. This
situation occurs when you --force push a change and generate a repository
containing something like this:
* -- * -- B -- O -- O -- O (15eb9d37a2e5fba757459db6ed992638aee3ca4a)
\
N -- N -- N (7c2b38632ae2af8a2f0f5c8fa2e55523220a3335)
When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 7c2b38632ae2af8a2f0f5c8fa2e55523220a3335
Author: Tom Clegg <tom at curoverse.com>
Date: Fri Nov 25 18:10:16 2016 -0500
10586: Fix deadlock in KeepWriterThreadPool.
Previously, an unhandled exception in KeepWriterThread (including
KeepService.put()) would terminate the thread without calling
queue.task_done(), which would cause queue.join() to deadlock.
Additionally, this commit moves all of the "wait until work is needed"
logic from KeepWriterThread into KeepWriterQueue.
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index db7835b..776c9b2 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -511,8 +511,10 @@ class KeepClient(object):
with self.successful_copies_lock:
self.successful_copies += replicas_nr
self.response = response
+ with self.pending_tries_notification:
+ self.pending_tries_notification.notify_all()
- def write_fail(self, ks, status_code):
+ def write_fail(self, ks):
with self.pending_tries_notification:
self.pending_tries += 1
self.pending_tries_notification.notify()
@@ -520,8 +522,25 @@ class KeepClient(object):
def pending_copies(self):
with self.successful_copies_lock:
return self.wanted_copies - self.successful_copies
-
-
+
+ def get_next_task(self):
+ with self.pending_tries_notification:
+ while True:
+ if self.pending_copies() < 1:
+ # Drain the queue and then raise Queue.Empty
+ while True:
+ self.get_nowait()
+ self.task_done()
+ elif self.pending_tries > 0:
+ self.pending_tries -= 1
+ return self.get_nowait()
+ elif self.empty():
+ self.pending_tries_notification.notify_all()
+ raise Queue.Empty
+ else:
+ self.pending_tries_notification.wait()
+
+
class KeepWriterThreadPool(object):
def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None):
self.total_task_nr = 0
@@ -551,10 +570,6 @@ class KeepClient(object):
worker.start()
# Wait for finished work
self.queue.join()
- with self.queue.pending_tries_notification:
- self.queue.pending_tries_notification.notify_all()
- for worker in self.workers:
- worker.join()
def response(self):
return self.queue.response
@@ -567,58 +582,51 @@ class KeepClient(object):
self.queue = queue
self.data = data
self.data_hash = data_hash
-
+ self.daemon = True
+
def run(self):
- while not self.queue.empty():
- if self.queue.pending_copies() > 0:
- # Avoid overreplication, wait for some needed re-attempt
- with self.queue.pending_tries_notification:
- if self.queue.pending_tries <= 0:
- self.queue.pending_tries_notification.wait()
- continue # try again when awake
- self.queue.pending_tries -= 1
-
- # Get to work
- try:
- service, service_root = self.queue.get_nowait()
- except Queue.Empty:
- continue
- if service.finished():
- self.queue.task_done()
- continue
- success = bool(service.put(self.data_hash,
- self.data,
- timeout=self.timeout))
- result = service.last_result()
- if success:
- _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
- str(threading.current_thread()),
- self.data_hash,
- len(self.data),
- service_root)
- try:
- replicas_stored = int(result['headers']['x-keep-replicas-stored'])
- except (KeyError, ValueError):
- replicas_stored = 1
-
- self.queue.write_success(result['body'].strip(), replicas_stored)
- else:
- if result.get('status_code', None):
- _logger.debug("Request fail: PUT %s => %s %s",
- self.data_hash,
- result['status_code'],
- result['body'])
- self.queue.write_fail(service, result.get('status_code', None)) # Schedule a re-attempt with next service
- # Mark as done so the queue can be join()ed
- self.queue.task_done()
+ while True:
+ try:
+ service, service_root = self.queue.get_next_task()
+ except Queue.Empty:
+ return
+ try:
+ locator, copies = self.do_task(service, service_root)
+ except Exception as e:
+ _logger.exception("Exception in KeepWriterThread")
+ self.queue.write_fail(service)
else:
- # Remove the task from the queue anyways
- try:
- self.queue.get_nowait()
- # Mark as done so the queue can be join()ed
- self.queue.task_done()
- except Queue.Empty:
- continue
+ self.queue.write_success(locator, copies)
+ finally:
+ self.queue.task_done()
+
+ def do_task(self, service, service_root):
+ if service.finished():
+ return
+ success = bool(service.put(self.data_hash,
+ self.data,
+ timeout=self.timeout))
+ result = service.last_result()
+
+ if not success:
+ if result.get('status_code', None):
+ _logger.debug("Request fail: PUT %s => %s %s",
+ self.data_hash,
+ result['status_code'],
+ result['body'])
+ raise RuntimeError()
+
+ _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
+ str(threading.current_thread()),
+ self.data_hash,
+ len(self.data),
+ service_root)
+ try:
+ replicas_stored = int(result['headers']['x-keep-replicas-stored'])
+ except (KeyError, ValueError):
+ replicas_stored = 1
+
+ return result['body'].strip(), replicas_stored
def __init__(self, api_client=None, proxy=None,
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index 908539b..85b5bc8 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -1081,58 +1081,74 @@ class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase):
self.check_exception(copies=2, num_retries=3)
-class KeepClientAvoidClientOverreplicationTestCase(unittest.TestCase, tutil.ApiClientMock):
-
-
+class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
+
class FakeKeepService(object):
- def __init__(self, delay, will_succeed, replicas=1):
+ def __init__(self, delay, will_succeed=False, will_raise=None, replicas=1):
self.delay = delay
- self.success = will_succeed
+ self.will_succeed = will_succeed
+ self.will_raise = will_raise
self._result = {}
self._result['headers'] = {}
self._result['headers']['x-keep-replicas-stored'] = str(replicas)
self._result['body'] = 'foobar'
-
+
def put(self, data_hash, data, timeout):
time.sleep(self.delay)
- return self.success
-
+ if self.will_raise is not None:
+ raise self.will_raise
+ return self.will_succeed
+
def last_result(self):
- return self._result
-
+ if self.will_succeed:
+ return self._result
+
def finished(self):
return False
-
- def test_only_write_enough_on_success(self):
- copies = 3
- pool = arvados.KeepClient.KeepWriterThreadPool(
+ def setUp(self):
+ self.copies = 3
+ self.pool = arvados.KeepClient.KeepWriterThreadPool(
data = 'foo',
data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
- max_service_replicas = copies,
- copies = copies
+ max_service_replicas = self.copies,
+ copies = self.copies
)
+
+ def test_only_write_enough_on_success(self):
for i in range(10):
ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
- pool.add_task(ks, None)
- pool.join()
- self.assertEqual(pool.done(), copies)
+ self.pool.add_task(ks, None)
+ self.pool.join()
+ self.assertEqual(self.pool.done(), self.copies)
def test_only_write_enough_on_partial_success(self):
- copies = 3
- pool = arvados.KeepClient.KeepWriterThreadPool(
- data = 'foo',
- data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
- max_service_replicas = copies,
- copies = copies
- )
for i in range(5):
ks = self.FakeKeepService(delay=i/10.0, will_succeed=False)
- pool.add_task(ks, None)
+ self.pool.add_task(ks, None)
+ ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
+ self.pool.add_task(ks, None)
+ self.pool.join()
+ self.assertEqual(self.pool.done(), self.copies)
+
+ def test_only_write_enough_when_some_crash(self):
+ for i in range(5):
+ ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
+ self.pool.add_task(ks, None)
+ ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
+ self.pool.add_task(ks, None)
+ self.pool.join()
+ self.assertEqual(self.pool.done(), self.copies)
+
+ def test_fail_when_too_many_crash(self):
+ for i in range(self.copies+1):
+ ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
+ self.pool.add_task(ks, None)
+ for i in range(self.copies-1):
ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
- pool.add_task(ks, None)
- pool.join()
- self.assertEqual(pool.done(), copies)
+ self.pool.add_task(ks, None)
+ self.pool.join()
+ self.assertEqual(self.pool.done(), self.copies-1)
@tutil.skip_sleep
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list