[ARVADOS] created: 15eb9d37a2e5fba757459db6ed992638aee3ca4a

Git user git at public.curoverse.com
Fri Nov 25 17:04:12 EST 2016


        at  15eb9d37a2e5fba757459db6ed992638aee3ca4a (commit)


commit 15eb9d37a2e5fba757459db6ed992638aee3ca4a
Author: Tom Clegg <tom at curoverse.com>
Date:   Fri Nov 25 17:03:44 2016 -0500

    10586: Fix deadlock in KeepWriterThreadPool.
    
    Previously, an unhandled exception in KeepWriterThread (including
    KeepService.put()) would terminate the thread without calling
    queue.task_done(), which would cause queue.join() to deadlock.
    
    Additionally, this commit moves all of the "wait until work is needed"
    logic from KeepWriterThread into KeepWriterQueue.

diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index db7835b..56f3ecc 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -511,6 +511,10 @@ class KeepClient(object):
             with self.successful_copies_lock:
                 self.successful_copies += replicas_nr
                 self.response = response
+            if self.pending_copies() < 1:
+                # Remaining workers are no longer needed
+                with self.pending_tries_notification:
+                    self.pending_tries_notification.notify_all()
         
         def write_fail(self, ks, status_code):
             with self.pending_tries_notification:
@@ -520,8 +524,23 @@ class KeepClient(object):
         def pending_copies(self):
             with self.successful_copies_lock:
                 return self.wanted_copies - self.successful_copies
-    
-    
+
+        def get_next_task(self):
+            with self.pending_tries_notification:
+                while self.pending_tries < 1 and self.pending_copies() > 0:
+                    # Enough threads are already running.  Wait until
+                    # one fails, or enough threads succeed to make
+                    # further work unnecessary.
+                    self.pending_tries_notification.wait()
+                if self.pending_copies() < 1:
+                    # Drain the queue and then raise Queue.Empty
+                    while True:
+                        self.get_nowait()
+                        self.task_done()
+                self.pending_tries -= 1
+                return self.get_nowait()
+
+
     class KeepWriterThreadPool(object):
         def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None):
             self.total_task_nr = 0
@@ -567,58 +586,47 @@ class KeepClient(object):
             self.queue = queue
             self.data = data
             self.data_hash = data_hash
-        
+            self.daemon = True
+
         def run(self):
-            while not self.queue.empty():
-                if self.queue.pending_copies() > 0:
-                    # Avoid overreplication, wait for some needed re-attempt
-                    with self.queue.pending_tries_notification:
-                        if self.queue.pending_tries <= 0:
-                            self.queue.pending_tries_notification.wait()
-                            continue # try again when awake
-                        self.queue.pending_tries -= 1
-
-                    # Get to work
-                    try:
-                        service, service_root = self.queue.get_nowait()
-                    except Queue.Empty:
-                        continue
-                    if service.finished():
-                        self.queue.task_done()
-                        continue
-                    success = bool(service.put(self.data_hash,
-                                                self.data,
-                                                timeout=self.timeout))
-                    result = service.last_result()
-                    if success:
-                        _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
-                                      str(threading.current_thread()),
-                                      self.data_hash,
-                                      len(self.data),
-                                      service_root)
-                        try:
-                            replicas_stored = int(result['headers']['x-keep-replicas-stored'])
-                        except (KeyError, ValueError):
-                            replicas_stored = 1
-                        
-                        self.queue.write_success(result['body'].strip(), replicas_stored)
-                    else:
-                        if result.get('status_code', None):
-                            _logger.debug("Request fail: PUT %s => %s %s",
-                                          self.data_hash,
-                                          result['status_code'],
-                                          result['body'])
-                        self.queue.write_fail(service, result.get('status_code', None)) # Schedule a re-attempt with next service
-                    # Mark as done so the queue can be join()ed
+            while True:
+                try:
+                    service, service_root = self.queue.get_next_task()
+                except Queue.Empty:
+                    return
+                try:
+                    self.do_task(service, service_root)
+                except Exception as e:
+                    _logger.exception("Exception in KeepWriterThread")
+                finally:
                     self.queue.task_done()
-                else:
-                    # Remove the task from the queue anyways
-                    try:
-                        self.queue.get_nowait()
-                        # Mark as done so the queue can be join()ed
-                        self.queue.task_done()
-                    except Queue.Empty:
-                        continue
+
+        def do_task(self, service, service_root):
+            if service.finished():
+                return
+            success = bool(service.put(self.data_hash,
+                                        self.data,
+                                        timeout=self.timeout))
+            result = service.last_result()
+            if success:
+                _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
+                              str(threading.current_thread()),
+                              self.data_hash,
+                              len(self.data),
+                              service_root)
+                try:
+                    replicas_stored = int(result['headers']['x-keep-replicas-stored'])
+                except (KeyError, ValueError):
+                    replicas_stored = 1
+
+                self.queue.write_success(result['body'].strip(), replicas_stored)
+            else:
+                if result.get('status_code', None):
+                    _logger.debug("Request fail: PUT %s => %s %s",
+                                  self.data_hash,
+                                  result['status_code'],
+                                  result['body'])
+                self.queue.write_fail(service, result.get('status_code', None)) # Schedule a re-attempt with next service
 
 
     def __init__(self, api_client=None, proxy=None,
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index c533a64..70aa110 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -1068,58 +1068,74 @@ class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase):
             self.check_exception(copies=2, num_retries=3)
 
 
-class KeepClientAvoidClientOverreplicationTestCase(unittest.TestCase, tutil.ApiClientMock):
-    
-    
+class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
+
     class FakeKeepService(object):
-        def __init__(self, delay, will_succeed, replicas=1):
+        def __init__(self, delay, will_succeed=False, will_raise=None, replicas=1):
             self.delay = delay
-            self.success = will_succeed
+            self.will_succeed = will_succeed
+            self.will_raise = will_raise
             self._result = {}
             self._result['headers'] = {}
             self._result['headers']['x-keep-replicas-stored'] = str(replicas)
             self._result['body'] = 'foobar'
-        
+
         def put(self, data_hash, data, timeout):
             time.sleep(self.delay)
-            return self.success
-        
+            if self.will_raise is not None:
+                raise self.will_raise
+            return self.will_succeed
+
         def last_result(self):
-            return self._result
-        
+            if self.will_succeed:
+                return self._result
+
         def finished(self):
             return False
     
-    
-    def test_only_write_enough_on_success(self):
-        copies = 3
-        pool = arvados.KeepClient.KeepWriterThreadPool(
+    def setUp(self):
+        self.copies = 3
+        self.pool = arvados.KeepClient.KeepWriterThreadPool(
             data = 'foo',
             data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
-            max_service_replicas = copies,
-            copies = copies
+            max_service_replicas = self.copies,
+            copies = self.copies
         )
+
+    def test_only_write_enough_on_success(self):
         for i in range(10):
             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
-            pool.add_task(ks, None)
-        pool.join()
-        self.assertEqual(pool.done(), copies)
+            self.pool.add_task(ks, None)
+        self.pool.join()
+        self.assertEqual(self.pool.done(), self.copies)
 
     def test_only_write_enough_on_partial_success(self):
-        copies = 3
-        pool = arvados.KeepClient.KeepWriterThreadPool(
-            data = 'foo',
-            data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
-            max_service_replicas = copies,
-            copies = copies
-        )
         for i in range(5):
             ks = self.FakeKeepService(delay=i/10.0, will_succeed=False)
-            pool.add_task(ks, None)
+            self.pool.add_task(ks, None)
+            ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
+            self.pool.add_task(ks, None)
+        self.pool.join()
+        self.assertEqual(self.pool.done(), self.copies)
+
+    def test_only_write_enough_when_some_crash(self):
+        for i in range(5):
+            ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
+            self.pool.add_task(ks, None)
+            ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
+            self.pool.add_task(ks, None)
+        self.pool.join()
+        self.assertEqual(self.pool.done(), self.copies)
+
+    def test_fail_when_too_many_crash(self):
+        for i in range(self.copies+1):
+            ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
+            self.pool.add_task(ks, None)
+        for i in range(self.copies-1):
             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
-            pool.add_task(ks, None)
-        pool.join()
-        self.assertEqual(pool.done(), copies)
+            self.pool.add_task(ks, None)
+        self.pool.join()
+        self.assertEqual(self.pool.done(), self.copies-1)
     
 
 @tutil.skip_sleep

commit 62857acf3fef2f05228ba69954d44c793bf5982d
Author: Tom Clegg <tom at curoverse.com>
Date:   Fri Nov 25 11:58:44 2016 -0500

    10586: Add tests for block-writing concurrency via Collection class.

diff --git a/sdk/python/tests/arvados_testutil.py b/sdk/python/tests/arvados_testutil.py
index 71c9b17..25a6b40 100644
--- a/sdk/python/tests/arvados_testutil.py
+++ b/sdk/python/tests/arvados_testutil.py
@@ -85,6 +85,8 @@ class FakeCurl:
             self._headerfunction("HTTP/1.1 {} Status".format(self._resp_code))
             for k, v in self._resp_headers.iteritems():
                 self._headerfunction(k + ': ' + str(v))
+        while callable(self._resp_body):
+            self._resp_body = self._resp_body()
         if type(self._resp_body) is not bool:
             self._writer(self._resp_body)
 
diff --git a/sdk/python/tests/keepstub.py b/sdk/python/tests/keepstub.py
index d79788c..b8cd975 100644
--- a/sdk/python/tests/keepstub.py
+++ b/sdk/python/tests/keepstub.py
@@ -1,10 +1,53 @@
 import BaseHTTPServer
 import hashlib
+import mock
 import os
 import re
+import socket
 import SocketServer
+import threading
 import time
 
+import arvados_testutil as tutil
+
+
+class TestCase(tutil.ApiClientMock):
+    STUB_COUNT = 1
+    STUB_TYPE = 'disk'
+
+    def setUp(self):
+        self._stub_servers = []
+        ports = []
+        for i in range(self.STUB_COUNT):
+            sock = socket.socket()
+            sock.bind(('0.0.0.0', 0))
+            port = sock.getsockname()[1]
+            sock.close()
+            server = Server(('0.0.0.0', port), Handler)
+            thread = threading.Thread(target=server.serve_forever)
+            thread.daemon = True
+            thread.start()
+            ports.append(port)
+            self._stub_servers.append(server)
+        self.api_client = mock.MagicMock(name='api_client_mock')
+        self.api_client.keep_services().accessible().execute.return_value = {
+            'items_available': len(ports),
+            'items': [{
+                'uuid': 'zzzzz-bi6l4-{:015x}'.format(i),
+                'owner_uuid': 'zzzzz-tpzed-000000000000000',
+                'service_host': '0.0.0.0',
+                'service_port': port,
+                'service_ssl_flag': False,
+                'service_type': self.STUB_TYPE,
+                'read_only': False,
+            } for i, port in enumerate(ports)],
+        }
+
+    def tearDown(self):
+        for server in self._stub_servers:
+            server.shutdown()
+
+
 class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer, object):
 
     allow_reuse_address = 1
@@ -33,7 +76,7 @@ class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer, object):
     def setdelays(self, **kwargs):
         """In future requests, induce delays at the given checkpoints."""
         for (k, v) in kwargs.iteritems():
-            self.delays.get(k) # NameError if unknown key
+            self.delays.get(k)  # NameError if unknown key
             self.delays[k] = v
 
     def setbandwidth(self, bandwidth):
@@ -51,15 +94,18 @@ class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer, object):
             todo = wake - time.time()
 
     def _do_delay(self, k):
+        if callable(self.delays[k]):
+            return self.delays[k]()
         self._sleep_at_least(self.delays[k])
 
 
 class Handler(BaseHTTPServer.BaseHTTPRequestHandler, object):
+
     def wfile_bandwidth_write(self, data_to_write):
         if self.server.bandwidth == None and self.server.delays['mid_write'] == 0:
             self.wfile.write(data_to_write)
         else:
-            BYTES_PER_WRITE = int(self.server.bandwidth/4.0) or 32768
+            BYTES_PER_WRITE = int(self.server.bandwidth / 4.0) or 32768
             outage_happened = False
             num_bytes = len(data_to_write)
             num_sent_bytes = 0
@@ -70,9 +116,9 @@ class Handler(BaseHTTPServer.BaseHTTPRequestHandler, object):
                     target_time += self.delays['mid_write']
                     outage_happened = True
                 num_write_bytes = min(BYTES_PER_WRITE,
-                    num_bytes - num_sent_bytes)
+                                      num_bytes - num_sent_bytes)
                 self.wfile.write(data_to_write[
-                    num_sent_bytes:num_sent_bytes+num_write_bytes])
+                    num_sent_bytes:num_sent_bytes + num_write_bytes])
                 num_sent_bytes += num_write_bytes
                 if self.server.bandwidth is not None:
                     target_time += num_write_bytes / self.server.bandwidth
@@ -83,7 +129,7 @@ class Handler(BaseHTTPServer.BaseHTTPRequestHandler, object):
         if self.server.bandwidth == None and self.server.delays['mid_read'] == 0:
             return self.rfile.read(bytes_to_read)
         else:
-            BYTES_PER_READ = int(self.server.bandwidth/4.0) or 32768
+            BYTES_PER_READ = int(self.server.bandwidth / 4.0) or 32768
             data = ''
             outage_happened = False
             bytes_read = 0
@@ -94,7 +140,7 @@ class Handler(BaseHTTPServer.BaseHTTPRequestHandler, object):
                     target_time += self.delays['mid_read']
                     outage_happened = True
                 next_bytes_to_read = min(BYTES_PER_READ,
-                    bytes_to_read - bytes_read)
+                                         bytes_to_read - bytes_read)
                 data += self.rfile.read(next_bytes_to_read)
                 bytes_read += next_bytes_to_read
                 if self.server.bandwidth is not None:
@@ -131,7 +177,8 @@ class Handler(BaseHTTPServer.BaseHTTPRequestHandler, object):
             return self.send_response(404)
         self.send_response(200)
         self.send_header('Content-type', 'application/octet-stream')
-        self.send_header('Content-length', str(len(self.server.store[datahash])))
+        self.send_header(
+            'Content-length', str(len(self.server.store[datahash])))
         self.end_headers()
         self.server._do_delay('response_close')
 
@@ -142,8 +189,9 @@ class Handler(BaseHTTPServer.BaseHTTPRequestHandler, object):
         # reading the actual code that ships in Debian it clearly is not, so we
         # need to send the response on the socket directly.
         self.wfile_bandwidth_write("%s %d %s\r\n\r\n" %
-                         (self.protocol_version, 100, "Continue"))
-        data = self.rfile_bandwidth_read(int(self.headers.getheader('content-length')))
+                                  (self.protocol_version, 100, "Continue"))
+        data = self.rfile_bandwidth_read(
+            int(self.headers.getheader('content-length')))
         datahash = hashlib.md5(data).hexdigest()
         self.server.store[datahash] = data
         self.server._do_delay('response')
diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py
index fc30a24..5e3cbcb 100644
--- a/sdk/python/tests/test_collections.py
+++ b/sdk/python/tests/test_collections.py
@@ -9,12 +9,14 @@ import os
 import pprint
 import re
 import tempfile
+import threading
 import unittest
 
 import run_test_server
 from arvados._ranges import Range, LocatorAndRange
 from arvados.collection import Collection, CollectionReader
 import arvados_testutil as tutil
+import keepstub
 
 class TestResumableWriter(arvados.ResumableCollectionWriter):
     KEEP_BLOCK_SIZE = 1024  # PUT to Keep every 1K.
@@ -1215,5 +1217,72 @@ class CollectionCreateUpdateTest(run_test_server.TestCaseWithServers):
         self.assertRegexpMatches(c1.manifest_text(), r"\. e65075d550f9b5bf9992fa1d71a131be\+3\S* 7ac66c0f148de9519b8bd264312c4d64\+7\S* 0:3:count\.txt 3:7:count\.txt~\d\d\d\d\d\d\d\d-\d\d\d\d\d\d~conflict~$")
 
 
+class CollectionAsyncWrites(keepstub.TestCase, unittest.TestCase):
+    DATA = bytearray(1<<25)
+    LOCATOR = '7f614da9329cd3aebf59b91aadc30bf0+67108864'
+    TEST_TIMEOUT = 10.0
+    BUFFERED_BLOCKS = 4
+    EXPECT_REPLICAS = 2
+    EXPECT_THREADS = EXPECT_REPLICAS * BUFFERED_BLOCKS
+    STUB_COUNT = 5
+    STUB_TYPE = 'disk'
+
+    def setUp(self):
+        super(CollectionAsyncWrites, self).setUp()
+        self._wait_cond = threading.Condition()
+        self._wait_count = 0
+        self._release = threading.Event()
+
+    def wait_and_release(self, tag=None):
+        def f():
+            _cond = self._wait_cond
+            _release = self._release
+            _cond.acquire()
+            self._wait_count += 1
+            print 'wait_and_release({}): {}'.format(self._wait_count, tag)
+            _cond.notify()
+            _cond.release()
+            _release.wait()
+            return self.LOCATOR
+        return f
+
+    def write_data(self):
+        c = arvados.collection.Collection(api_client=self.api_client, replication_desired=2)
+        c._my_block_manager().num_put_threads = self.BUFFERED_BLOCKS
+        for i in range(self.BUFFERED_BLOCKS * arvados.config.KEEP_BLOCK_SIZE / len(self.DATA)):
+            f = c.open('file' + str(i) + '.txt', 'w')
+            f.write(self.DATA)
+            f.close(flush=False)
+
+    def test_concurrent_writes_mocked_pycurl(self):
+        with tutil.mock_keep_responses(self.wait_and_release(), *(200 for _ in range(20))):
+            self._test_queued_writes()
+
+    def test_concurrent_writes_mocked_server(self):
+        for server in self._stub_servers:
+            server.setdelays(response=self.wait_and_release(server.server_address))
+        self._test_queued_writes()
+
+    def _test_queued_writes(self):
+        writer = threading.Thread(target=self.write_data)
+        writer.daemon = True
+        writer.start()
+        self._wait_cond.acquire()
+        try:
+            while self._wait_count < self.EXPECT_THREADS:
+                _was = self._wait_count
+                self._wait_cond.wait(self.TEST_TIMEOUT)
+                self.assertTrue(_was < self._wait_count, 'timeout: only {} threads have called put(), expect {}'.format(self._wait_count, self.EXPECT_THREADS))
+        finally:
+            self._wait_cond.release()
+            self._release.set()
+        writer.join(self.TEST_TIMEOUT)
+
+
+class CollectionAsyncWritesViaProxy(CollectionAsyncWrites):
+    STUB_TYPE = 'proxy'
+    EXPECT_THREADS = CollectionAsyncWrites.BUFFERED_BLOCKS
+
+
 if __name__ == '__main__':
     unittest.main()
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index 908539b..c533a64 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -660,13 +660,18 @@ class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
         self.check_64_zeros_error_order('put', arvados.errors.KeepWriteError)
 
 
-class KeepClientTimeout(unittest.TestCase, tutil.ApiClientMock):
+class KeepClientTimeout(keepstub.TestCase, unittest.TestCase):
     # BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
     # 1s worth of data and then trigger bandwidth errors before running
     # out of data.
     DATA = 'x'*2**11
     BANDWIDTH_LOW_LIM = 1024
     TIMEOUT_TIME = 1.0
+    STUB_COUNT = 1
+
+    def setUp(self):
+        super(KeepClientTimeout, self).setUp()
+        self.server = self._stub_servers[0]
 
     class assertTakesBetween(unittest.TestCase):
         def __init__(self, tmin, tmax):
@@ -694,24 +699,6 @@ class KeepClientTimeout(unittest.TestCase, tutil.ApiClientMock):
             delta = round(time.time() - self.t0, 3)
             self.assertGreaterEqual(delta, self.tmin)
 
-    def setUp(self):
-        sock = socket.socket()
-        sock.bind(('0.0.0.0', 0))
-        self.port = sock.getsockname()[1]
-        sock.close()
-        self.server = keepstub.Server(('0.0.0.0', self.port), keepstub.Handler)
-        self.thread = threading.Thread(target=self.server.serve_forever)
-        self.thread.daemon = True # Exit thread if main proc exits
-        self.thread.start()
-        self.api_client = self.mock_keep_services(
-            count=1,
-            service_host='localhost',
-            service_port=self.port,
-        )
-
-    def tearDown(self):
-        self.server.shutdown()
-
     def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
         return arvados.KeepClient(
             api_client=self.api_client,

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list