[ARVADOS] created: 15eb9d37a2e5fba757459db6ed992638aee3ca4a
Git user
git at public.curoverse.com
Fri Nov 25 17:04:12 EST 2016
at 15eb9d37a2e5fba757459db6ed992638aee3ca4a (commit)
commit 15eb9d37a2e5fba757459db6ed992638aee3ca4a
Author: Tom Clegg <tom at curoverse.com>
Date: Fri Nov 25 17:03:44 2016 -0500
10586: Fix deadlock in KeepWriterThreadPool.
Previously, an unhandled exception in KeepWriterThread (including
KeepService.put()) would terminate the thread without calling
queue.task_done(), which would cause queue.join() to deadlock.
Additionally, this commit moves all of the "wait until work is needed"
logic from KeepWriterThread into KeepWriterQueue.
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index db7835b..56f3ecc 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -511,6 +511,10 @@ class KeepClient(object):
with self.successful_copies_lock:
self.successful_copies += replicas_nr
self.response = response
+ if self.pending_copies() < 1:
+ # Remaining workers are no longer needed
+ with self.pending_tries_notification:
+ self.pending_tries_notification.notify_all()
def write_fail(self, ks, status_code):
with self.pending_tries_notification:
@@ -520,8 +524,23 @@ class KeepClient(object):
def pending_copies(self):
with self.successful_copies_lock:
return self.wanted_copies - self.successful_copies
-
-
+
+ def get_next_task(self):
+ with self.pending_tries_notification:
+ while self.pending_tries < 1 and self.pending_copies() > 0:
+ # Enough threads are already running. Wait until
+ # one fails, or enough threads succeed to make
+ # further work unnecessary.
+ self.pending_tries_notification.wait()
+ if self.pending_copies() < 1:
+ # Drain the queue and then raise Queue.Empty
+ while True:
+ self.get_nowait()
+ self.task_done()
+ self.pending_tries -= 1
+ return self.get_nowait()
+
+
class KeepWriterThreadPool(object):
def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None):
self.total_task_nr = 0
@@ -567,58 +586,47 @@ class KeepClient(object):
self.queue = queue
self.data = data
self.data_hash = data_hash
-
+ self.daemon = True
+
def run(self):
- while not self.queue.empty():
- if self.queue.pending_copies() > 0:
- # Avoid overreplication, wait for some needed re-attempt
- with self.queue.pending_tries_notification:
- if self.queue.pending_tries <= 0:
- self.queue.pending_tries_notification.wait()
- continue # try again when awake
- self.queue.pending_tries -= 1
-
- # Get to work
- try:
- service, service_root = self.queue.get_nowait()
- except Queue.Empty:
- continue
- if service.finished():
- self.queue.task_done()
- continue
- success = bool(service.put(self.data_hash,
- self.data,
- timeout=self.timeout))
- result = service.last_result()
- if success:
- _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
- str(threading.current_thread()),
- self.data_hash,
- len(self.data),
- service_root)
- try:
- replicas_stored = int(result['headers']['x-keep-replicas-stored'])
- except (KeyError, ValueError):
- replicas_stored = 1
-
- self.queue.write_success(result['body'].strip(), replicas_stored)
- else:
- if result.get('status_code', None):
- _logger.debug("Request fail: PUT %s => %s %s",
- self.data_hash,
- result['status_code'],
- result['body'])
- self.queue.write_fail(service, result.get('status_code', None)) # Schedule a re-attempt with next service
- # Mark as done so the queue can be join()ed
+ while True:
+ try:
+ service, service_root = self.queue.get_next_task()
+ except Queue.Empty:
+ return
+ try:
+ self.do_task(service, service_root)
+ except Exception as e:
+ _logger.exception("Exception in KeepWriterThread")
+ finally:
self.queue.task_done()
- else:
- # Remove the task from the queue anyways
- try:
- self.queue.get_nowait()
- # Mark as done so the queue can be join()ed
- self.queue.task_done()
- except Queue.Empty:
- continue
+
+ def do_task(self, service, service_root):
+ if service.finished():
+ return
+ success = bool(service.put(self.data_hash,
+ self.data,
+ timeout=self.timeout))
+ result = service.last_result()
+ if success:
+ _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
+ str(threading.current_thread()),
+ self.data_hash,
+ len(self.data),
+ service_root)
+ try:
+ replicas_stored = int(result['headers']['x-keep-replicas-stored'])
+ except (KeyError, ValueError):
+ replicas_stored = 1
+
+ self.queue.write_success(result['body'].strip(), replicas_stored)
+ else:
+ if result.get('status_code', None):
+ _logger.debug("Request fail: PUT %s => %s %s",
+ self.data_hash,
+ result['status_code'],
+ result['body'])
+ self.queue.write_fail(service, result.get('status_code', None)) # Schedule a re-attempt with next service
def __init__(self, api_client=None, proxy=None,
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index c533a64..70aa110 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -1068,58 +1068,74 @@ class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase):
self.check_exception(copies=2, num_retries=3)
-class KeepClientAvoidClientOverreplicationTestCase(unittest.TestCase, tutil.ApiClientMock):
-
-
+class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
+
class FakeKeepService(object):
- def __init__(self, delay, will_succeed, replicas=1):
+ def __init__(self, delay, will_succeed=False, will_raise=None, replicas=1):
self.delay = delay
- self.success = will_succeed
+ self.will_succeed = will_succeed
+ self.will_raise = will_raise
self._result = {}
self._result['headers'] = {}
self._result['headers']['x-keep-replicas-stored'] = str(replicas)
self._result['body'] = 'foobar'
-
+
def put(self, data_hash, data, timeout):
time.sleep(self.delay)
- return self.success
-
+ if self.will_raise is not None:
+ raise self.will_raise
+ return self.will_succeed
+
def last_result(self):
- return self._result
-
+ if self.will_succeed:
+ return self._result
+
def finished(self):
return False
-
- def test_only_write_enough_on_success(self):
- copies = 3
- pool = arvados.KeepClient.KeepWriterThreadPool(
+ def setUp(self):
+ self.copies = 3
+ self.pool = arvados.KeepClient.KeepWriterThreadPool(
data = 'foo',
data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
- max_service_replicas = copies,
- copies = copies
+ max_service_replicas = self.copies,
+ copies = self.copies
)
+
+ def test_only_write_enough_on_success(self):
for i in range(10):
ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
- pool.add_task(ks, None)
- pool.join()
- self.assertEqual(pool.done(), copies)
+ self.pool.add_task(ks, None)
+ self.pool.join()
+ self.assertEqual(self.pool.done(), self.copies)
def test_only_write_enough_on_partial_success(self):
- copies = 3
- pool = arvados.KeepClient.KeepWriterThreadPool(
- data = 'foo',
- data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
- max_service_replicas = copies,
- copies = copies
- )
for i in range(5):
ks = self.FakeKeepService(delay=i/10.0, will_succeed=False)
- pool.add_task(ks, None)
+ self.pool.add_task(ks, None)
+ ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
+ self.pool.add_task(ks, None)
+ self.pool.join()
+ self.assertEqual(self.pool.done(), self.copies)
+
+ def test_only_write_enough_when_some_crash(self):
+ for i in range(5):
+ ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
+ self.pool.add_task(ks, None)
+ ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
+ self.pool.add_task(ks, None)
+ self.pool.join()
+ self.assertEqual(self.pool.done(), self.copies)
+
+ def test_fail_when_too_many_crash(self):
+ for i in range(self.copies+1):
+ ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
+ self.pool.add_task(ks, None)
+ for i in range(self.copies-1):
ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
- pool.add_task(ks, None)
- pool.join()
- self.assertEqual(pool.done(), copies)
+ self.pool.add_task(ks, None)
+ self.pool.join()
+ self.assertEqual(self.pool.done(), self.copies-1)
@tutil.skip_sleep
commit 62857acf3fef2f05228ba69954d44c793bf5982d
Author: Tom Clegg <tom at curoverse.com>
Date: Fri Nov 25 11:58:44 2016 -0500
10586: Add tests for block-writing concurrency via Collection class.
diff --git a/sdk/python/tests/arvados_testutil.py b/sdk/python/tests/arvados_testutil.py
index 71c9b17..25a6b40 100644
--- a/sdk/python/tests/arvados_testutil.py
+++ b/sdk/python/tests/arvados_testutil.py
@@ -85,6 +85,8 @@ class FakeCurl:
self._headerfunction("HTTP/1.1 {} Status".format(self._resp_code))
for k, v in self._resp_headers.iteritems():
self._headerfunction(k + ': ' + str(v))
+ while callable(self._resp_body):
+ self._resp_body = self._resp_body()
if type(self._resp_body) is not bool:
self._writer(self._resp_body)
diff --git a/sdk/python/tests/keepstub.py b/sdk/python/tests/keepstub.py
index d79788c..b8cd975 100644
--- a/sdk/python/tests/keepstub.py
+++ b/sdk/python/tests/keepstub.py
@@ -1,10 +1,53 @@
import BaseHTTPServer
import hashlib
+import mock
import os
import re
+import socket
import SocketServer
+import threading
import time
+import arvados_testutil as tutil
+
+
+class TestCase(tutil.ApiClientMock):
+ STUB_COUNT = 1
+ STUB_TYPE = 'disk'
+
+ def setUp(self):
+ self._stub_servers = []
+ ports = []
+ for i in range(self.STUB_COUNT):
+ sock = socket.socket()
+ sock.bind(('0.0.0.0', 0))
+ port = sock.getsockname()[1]
+ sock.close()
+ server = Server(('0.0.0.0', port), Handler)
+ thread = threading.Thread(target=server.serve_forever)
+ thread.daemon = True
+ thread.start()
+ ports.append(port)
+ self._stub_servers.append(server)
+ self.api_client = mock.MagicMock(name='api_client_mock')
+ self.api_client.keep_services().accessible().execute.return_value = {
+ 'items_available': len(ports),
+ 'items': [{
+ 'uuid': 'zzzzz-bi6l4-{:015x}'.format(i),
+ 'owner_uuid': 'zzzzz-tpzed-000000000000000',
+ 'service_host': '0.0.0.0',
+ 'service_port': port,
+ 'service_ssl_flag': False,
+ 'service_type': self.STUB_TYPE,
+ 'read_only': False,
+ } for i, port in enumerate(ports)],
+ }
+
+ def tearDown(self):
+ for server in self._stub_servers:
+ server.shutdown()
+
+
class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer, object):
allow_reuse_address = 1
@@ -33,7 +76,7 @@ class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer, object):
def setdelays(self, **kwargs):
"""In future requests, induce delays at the given checkpoints."""
for (k, v) in kwargs.iteritems():
- self.delays.get(k) # NameError if unknown key
+ self.delays.get(k) # NameError if unknown key
self.delays[k] = v
def setbandwidth(self, bandwidth):
@@ -51,15 +94,18 @@ class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer, object):
todo = wake - time.time()
def _do_delay(self, k):
+ if callable(self.delays[k]):
+ return self.delays[k]()
self._sleep_at_least(self.delays[k])
class Handler(BaseHTTPServer.BaseHTTPRequestHandler, object):
+
def wfile_bandwidth_write(self, data_to_write):
if self.server.bandwidth == None and self.server.delays['mid_write'] == 0:
self.wfile.write(data_to_write)
else:
- BYTES_PER_WRITE = int(self.server.bandwidth/4.0) or 32768
+ BYTES_PER_WRITE = int(self.server.bandwidth / 4.0) or 32768
outage_happened = False
num_bytes = len(data_to_write)
num_sent_bytes = 0
@@ -70,9 +116,9 @@ class Handler(BaseHTTPServer.BaseHTTPRequestHandler, object):
target_time += self.delays['mid_write']
outage_happened = True
num_write_bytes = min(BYTES_PER_WRITE,
- num_bytes - num_sent_bytes)
+ num_bytes - num_sent_bytes)
self.wfile.write(data_to_write[
- num_sent_bytes:num_sent_bytes+num_write_bytes])
+ num_sent_bytes:num_sent_bytes + num_write_bytes])
num_sent_bytes += num_write_bytes
if self.server.bandwidth is not None:
target_time += num_write_bytes / self.server.bandwidth
@@ -83,7 +129,7 @@ class Handler(BaseHTTPServer.BaseHTTPRequestHandler, object):
if self.server.bandwidth == None and self.server.delays['mid_read'] == 0:
return self.rfile.read(bytes_to_read)
else:
- BYTES_PER_READ = int(self.server.bandwidth/4.0) or 32768
+ BYTES_PER_READ = int(self.server.bandwidth / 4.0) or 32768
data = ''
outage_happened = False
bytes_read = 0
@@ -94,7 +140,7 @@ class Handler(BaseHTTPServer.BaseHTTPRequestHandler, object):
target_time += self.delays['mid_read']
outage_happened = True
next_bytes_to_read = min(BYTES_PER_READ,
- bytes_to_read - bytes_read)
+ bytes_to_read - bytes_read)
data += self.rfile.read(next_bytes_to_read)
bytes_read += next_bytes_to_read
if self.server.bandwidth is not None:
@@ -131,7 +177,8 @@ class Handler(BaseHTTPServer.BaseHTTPRequestHandler, object):
return self.send_response(404)
self.send_response(200)
self.send_header('Content-type', 'application/octet-stream')
- self.send_header('Content-length', str(len(self.server.store[datahash])))
+ self.send_header(
+ 'Content-length', str(len(self.server.store[datahash])))
self.end_headers()
self.server._do_delay('response_close')
@@ -142,8 +189,9 @@ class Handler(BaseHTTPServer.BaseHTTPRequestHandler, object):
# reading the actual code that ships in Debian it clearly is not, so we
# need to send the response on the socket directly.
self.wfile_bandwidth_write("%s %d %s\r\n\r\n" %
- (self.protocol_version, 100, "Continue"))
- data = self.rfile_bandwidth_read(int(self.headers.getheader('content-length')))
+ (self.protocol_version, 100, "Continue"))
+ data = self.rfile_bandwidth_read(
+ int(self.headers.getheader('content-length')))
datahash = hashlib.md5(data).hexdigest()
self.server.store[datahash] = data
self.server._do_delay('response')
diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py
index fc30a24..5e3cbcb 100644
--- a/sdk/python/tests/test_collections.py
+++ b/sdk/python/tests/test_collections.py
@@ -9,12 +9,14 @@ import os
import pprint
import re
import tempfile
+import threading
import unittest
import run_test_server
from arvados._ranges import Range, LocatorAndRange
from arvados.collection import Collection, CollectionReader
import arvados_testutil as tutil
+import keepstub
class TestResumableWriter(arvados.ResumableCollectionWriter):
KEEP_BLOCK_SIZE = 1024 # PUT to Keep every 1K.
@@ -1215,5 +1217,72 @@ class CollectionCreateUpdateTest(run_test_server.TestCaseWithServers):
self.assertRegexpMatches(c1.manifest_text(), r"\. e65075d550f9b5bf9992fa1d71a131be\+3\S* 7ac66c0f148de9519b8bd264312c4d64\+7\S* 0:3:count\.txt 3:7:count\.txt~\d\d\d\d\d\d\d\d-\d\d\d\d\d\d~conflict~$")
+class CollectionAsyncWrites(keepstub.TestCase, unittest.TestCase):
+ DATA = bytearray(1<<25)
+ LOCATOR = '7f614da9329cd3aebf59b91aadc30bf0+67108864'
+ TEST_TIMEOUT = 10.0
+ BUFFERED_BLOCKS = 4
+ EXPECT_REPLICAS = 2
+ EXPECT_THREADS = EXPECT_REPLICAS * BUFFERED_BLOCKS
+ STUB_COUNT = 5
+ STUB_TYPE = 'disk'
+
+ def setUp(self):
+ super(CollectionAsyncWrites, self).setUp()
+ self._wait_cond = threading.Condition()
+ self._wait_count = 0
+ self._release = threading.Event()
+
+ def wait_and_release(self, tag=None):
+ def f():
+ _cond = self._wait_cond
+ _release = self._release
+ _cond.acquire()
+ self._wait_count += 1
+ print 'wait_and_release({}): {}'.format(self._wait_count, tag)
+ _cond.notify()
+ _cond.release()
+ _release.wait()
+ return self.LOCATOR
+ return f
+
+ def write_data(self):
+ c = arvados.collection.Collection(api_client=self.api_client, replication_desired=2)
+ c._my_block_manager().num_put_threads = self.BUFFERED_BLOCKS
+ for i in range(self.BUFFERED_BLOCKS * arvados.config.KEEP_BLOCK_SIZE / len(self.DATA)):
+ f = c.open('file' + str(i) + '.txt', 'w')
+ f.write(self.DATA)
+ f.close(flush=False)
+
+ def test_concurrent_writes_mocked_pycurl(self):
+ with tutil.mock_keep_responses(self.wait_and_release(), *(200 for _ in range(20))):
+ self._test_queued_writes()
+
+ def test_concurrent_writes_mocked_server(self):
+ for server in self._stub_servers:
+ server.setdelays(response=self.wait_and_release(server.server_address))
+ self._test_queued_writes()
+
+ def _test_queued_writes(self):
+ writer = threading.Thread(target=self.write_data)
+ writer.daemon = True
+ writer.start()
+ self._wait_cond.acquire()
+ try:
+ while self._wait_count < self.EXPECT_THREADS:
+ _was = self._wait_count
+ self._wait_cond.wait(self.TEST_TIMEOUT)
+ self.assertTrue(_was < self._wait_count, 'timeout: only {} threads have called put(), expect {}'.format(self._wait_count, self.EXPECT_THREADS))
+ finally:
+ self._wait_cond.release()
+ self._release.set()
+ writer.join(self.TEST_TIMEOUT)
+
+
+class CollectionAsyncWritesViaProxy(CollectionAsyncWrites):
+ STUB_TYPE = 'proxy'
+ EXPECT_THREADS = CollectionAsyncWrites.BUFFERED_BLOCKS
+
+
if __name__ == '__main__':
unittest.main()
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index 908539b..c533a64 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -660,13 +660,18 @@ class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
self.check_64_zeros_error_order('put', arvados.errors.KeepWriteError)
-class KeepClientTimeout(unittest.TestCase, tutil.ApiClientMock):
+class KeepClientTimeout(keepstub.TestCase, unittest.TestCase):
# BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
# 1s worth of data and then trigger bandwidth errors before running
# out of data.
DATA = 'x'*2**11
BANDWIDTH_LOW_LIM = 1024
TIMEOUT_TIME = 1.0
+ STUB_COUNT = 1
+
+ def setUp(self):
+ super(KeepClientTimeout, self).setUp()
+ self.server = self._stub_servers[0]
class assertTakesBetween(unittest.TestCase):
def __init__(self, tmin, tmax):
@@ -694,24 +699,6 @@ class KeepClientTimeout(unittest.TestCase, tutil.ApiClientMock):
delta = round(time.time() - self.t0, 3)
self.assertGreaterEqual(delta, self.tmin)
- def setUp(self):
- sock = socket.socket()
- sock.bind(('0.0.0.0', 0))
- self.port = sock.getsockname()[1]
- sock.close()
- self.server = keepstub.Server(('0.0.0.0', self.port), keepstub.Handler)
- self.thread = threading.Thread(target=self.server.serve_forever)
- self.thread.daemon = True # Exit thread if main proc exits
- self.thread.start()
- self.api_client = self.mock_keep_services(
- count=1,
- service_host='localhost',
- service_port=self.port,
- )
-
- def tearDown(self):
- self.server.shutdown()
-
def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
return arvados.KeepClient(
api_client=self.api_client,
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list