[ARVADOS] updated: 7c2b38632ae2af8a2f0f5c8fa2e55523220a3335

Git user git at public.curoverse.com
Fri Nov 25 18:10:40 EST 2016


Summary of changes:
 sdk/python/arvados/keep.py           | 70 ++++++++++++++++++------------------
 sdk/python/tests/arvados_testutil.py |  2 --
 sdk/python/tests/keepstub.py         | 66 +++++-----------------------------
 sdk/python/tests/test_collections.py | 69 -----------------------------------
 sdk/python/tests/test_keep_client.py | 25 +++++++++----
 5 files changed, 63 insertions(+), 169 deletions(-)

  discards  15eb9d37a2e5fba757459db6ed992638aee3ca4a (commit)
  discards  62857acf3fef2f05228ba69954d44c793bf5982d (commit)
       via  7c2b38632ae2af8a2f0f5c8fa2e55523220a3335 (commit)

This update added new revisions after undoing existing revisions.  That is
to say, the old revision is not a strict subset of the new revision.  This
situation occurs when you --force push a change and generate a repository
containing something like this:

 * -- * -- B -- O -- O -- O (15eb9d37a2e5fba757459db6ed992638aee3ca4a)
            \
             N -- N -- N (7c2b38632ae2af8a2f0f5c8fa2e55523220a3335)

When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 7c2b38632ae2af8a2f0f5c8fa2e55523220a3335
Author: Tom Clegg <tom at curoverse.com>
Date:   Fri Nov 25 18:10:16 2016 -0500

    10586: Fix deadlock in KeepWriterThreadPool.
    
    Previously, an unhandled exception in KeepWriterThread (including
    KeepService.put()) would terminate the thread without calling
    queue.task_done(), which would cause queue.join() to deadlock.
    
    Additionally, this commit moves all of the "wait until work is needed"
    logic from KeepWriterThread into KeepWriterQueue.

diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index db7835b..776c9b2 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -511,8 +511,10 @@ class KeepClient(object):
             with self.successful_copies_lock:
                 self.successful_copies += replicas_nr
                 self.response = response
+            with self.pending_tries_notification:
+                self.pending_tries_notification.notify_all()
         
-        def write_fail(self, ks, status_code):
+        def write_fail(self, ks):
             with self.pending_tries_notification:
                 self.pending_tries += 1
                 self.pending_tries_notification.notify()
@@ -520,8 +522,25 @@ class KeepClient(object):
         def pending_copies(self):
             with self.successful_copies_lock:
                 return self.wanted_copies - self.successful_copies
-    
-    
+
+        def get_next_task(self):
+            with self.pending_tries_notification:
+                while True:
+                    if self.pending_copies() < 1:
+                        # Drain the queue and then raise Queue.Empty
+                        while True:
+                            self.get_nowait()
+                            self.task_done()
+                    elif self.pending_tries > 0:
+                        self.pending_tries -= 1
+                        return self.get_nowait()
+                    elif self.empty():
+                        self.pending_tries_notification.notify_all()
+                        raise Queue.Empty
+                    else:
+                        self.pending_tries_notification.wait()
+
+
     class KeepWriterThreadPool(object):
         def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None):
             self.total_task_nr = 0
@@ -551,10 +570,6 @@ class KeepClient(object):
                 worker.start()
             # Wait for finished work
             self.queue.join()
-            with self.queue.pending_tries_notification:
-                self.queue.pending_tries_notification.notify_all()
-            for worker in self.workers:
-                worker.join()
         
         def response(self):
             return self.queue.response
@@ -567,58 +582,51 @@ class KeepClient(object):
             self.queue = queue
             self.data = data
             self.data_hash = data_hash
-        
+            self.daemon = True
+
         def run(self):
-            while not self.queue.empty():
-                if self.queue.pending_copies() > 0:
-                    # Avoid overreplication, wait for some needed re-attempt
-                    with self.queue.pending_tries_notification:
-                        if self.queue.pending_tries <= 0:
-                            self.queue.pending_tries_notification.wait()
-                            continue # try again when awake
-                        self.queue.pending_tries -= 1
-
-                    # Get to work
-                    try:
-                        service, service_root = self.queue.get_nowait()
-                    except Queue.Empty:
-                        continue
-                    if service.finished():
-                        self.queue.task_done()
-                        continue
-                    success = bool(service.put(self.data_hash,
-                                                self.data,
-                                                timeout=self.timeout))
-                    result = service.last_result()
-                    if success:
-                        _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
-                                      str(threading.current_thread()),
-                                      self.data_hash,
-                                      len(self.data),
-                                      service_root)
-                        try:
-                            replicas_stored = int(result['headers']['x-keep-replicas-stored'])
-                        except (KeyError, ValueError):
-                            replicas_stored = 1
-                        
-                        self.queue.write_success(result['body'].strip(), replicas_stored)
-                    else:
-                        if result.get('status_code', None):
-                            _logger.debug("Request fail: PUT %s => %s %s",
-                                          self.data_hash,
-                                          result['status_code'],
-                                          result['body'])
-                        self.queue.write_fail(service, result.get('status_code', None)) # Schedule a re-attempt with next service
-                    # Mark as done so the queue can be join()ed
-                    self.queue.task_done()
+            while True:
+                try:
+                    service, service_root = self.queue.get_next_task()
+                except Queue.Empty:
+                    return
+                try:
+                    locator, copies = self.do_task(service, service_root)
+                except Exception as e:
+                    _logger.exception("Exception in KeepWriterThread")
+                    self.queue.write_fail(service)
                 else:
-                    # Remove the task from the queue anyways
-                    try:
-                        self.queue.get_nowait()
-                        # Mark as done so the queue can be join()ed
-                        self.queue.task_done()
-                    except Queue.Empty:
-                        continue
+                    self.queue.write_success(locator, copies)
+                finally:
+                    self.queue.task_done()
+
+        def do_task(self, service, service_root):
+            if service.finished():
+                return
+            success = bool(service.put(self.data_hash,
+                                        self.data,
+                                        timeout=self.timeout))
+            result = service.last_result()
+
+            if not success:
+                if result.get('status_code', None):
+                    _logger.debug("Request fail: PUT %s => %s %s",
+                                  self.data_hash,
+                                  result['status_code'],
+                                  result['body'])
+                raise RuntimeError()
+
+            _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
+                          str(threading.current_thread()),
+                          self.data_hash,
+                          len(self.data),
+                          service_root)
+            try:
+                replicas_stored = int(result['headers']['x-keep-replicas-stored'])
+            except (KeyError, ValueError):
+                replicas_stored = 1
+
+            return result['body'].strip(), replicas_stored
 
 
     def __init__(self, api_client=None, proxy=None,
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index 908539b..85b5bc8 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -1081,58 +1081,74 @@ class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase):
             self.check_exception(copies=2, num_retries=3)
 
 
-class KeepClientAvoidClientOverreplicationTestCase(unittest.TestCase, tutil.ApiClientMock):
-    
-    
+class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
+
     class FakeKeepService(object):
-        def __init__(self, delay, will_succeed, replicas=1):
+        def __init__(self, delay, will_succeed=False, will_raise=None, replicas=1):
             self.delay = delay
-            self.success = will_succeed
+            self.will_succeed = will_succeed
+            self.will_raise = will_raise
             self._result = {}
             self._result['headers'] = {}
             self._result['headers']['x-keep-replicas-stored'] = str(replicas)
             self._result['body'] = 'foobar'
-        
+
         def put(self, data_hash, data, timeout):
             time.sleep(self.delay)
-            return self.success
-        
+            if self.will_raise is not None:
+                raise self.will_raise
+            return self.will_succeed
+
         def last_result(self):
-            return self._result
-        
+            if self.will_succeed:
+                return self._result
+
         def finished(self):
             return False
     
-    
-    def test_only_write_enough_on_success(self):
-        copies = 3
-        pool = arvados.KeepClient.KeepWriterThreadPool(
+    def setUp(self):
+        self.copies = 3
+        self.pool = arvados.KeepClient.KeepWriterThreadPool(
             data = 'foo',
             data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
-            max_service_replicas = copies,
-            copies = copies
+            max_service_replicas = self.copies,
+            copies = self.copies
         )
+
+    def test_only_write_enough_on_success(self):
         for i in range(10):
             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
-            pool.add_task(ks, None)
-        pool.join()
-        self.assertEqual(pool.done(), copies)
+            self.pool.add_task(ks, None)
+        self.pool.join()
+        self.assertEqual(self.pool.done(), self.copies)
 
     def test_only_write_enough_on_partial_success(self):
-        copies = 3
-        pool = arvados.KeepClient.KeepWriterThreadPool(
-            data = 'foo',
-            data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
-            max_service_replicas = copies,
-            copies = copies
-        )
         for i in range(5):
             ks = self.FakeKeepService(delay=i/10.0, will_succeed=False)
-            pool.add_task(ks, None)
+            self.pool.add_task(ks, None)
+            ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
+            self.pool.add_task(ks, None)
+        self.pool.join()
+        self.assertEqual(self.pool.done(), self.copies)
+
+    def test_only_write_enough_when_some_crash(self):
+        for i in range(5):
+            ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
+            self.pool.add_task(ks, None)
+            ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
+            self.pool.add_task(ks, None)
+        self.pool.join()
+        self.assertEqual(self.pool.done(), self.copies)
+
+    def test_fail_when_too_many_crash(self):
+        for i in range(self.copies+1):
+            ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
+            self.pool.add_task(ks, None)
+        for i in range(self.copies-1):
             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
-            pool.add_task(ks, None)
-        pool.join()
-        self.assertEqual(pool.done(), copies)
+            self.pool.add_task(ks, None)
+        self.pool.join()
+        self.assertEqual(self.pool.done(), self.copies-1)
     
 
 @tutil.skip_sleep

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list