[ARVADOS] updated: bef2f8e7bc3bd70b286971885294e5da2729da7b
Git user
git at public.curoverse.com
Fri Jun 24 18:10:35 EDT 2016
Summary of changes:
sdk/python/arvados/keep.py | 244 ++++++++---------------------------
sdk/python/tests/test_keep_client.py | 103 +++++++--------
2 files changed, 102 insertions(+), 245 deletions(-)
via bef2f8e7bc3bd70b286971885294e5da2729da7b (commit)
via 9ccdea630f00035e96eb2bac539a4bd0d3df196d (commit)
from dea15524cdbe9f791722fc956e939ce593e450b6 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit bef2f8e7bc3bd70b286971885294e5da2729da7b
Author: Lucas Di Pentima <lucas at curoverse.com>
Date: Fri Jun 24 19:10:22 2016 -0300
PySDK put() refactoring ready, all local tests worked OK
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index e0e7797..682ca0e 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -237,90 +237,6 @@ class KeepClient(object):
DEFAULT_TIMEOUT = (2, 256, 32768)
DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
- class ThreadLimiter(object):
- """Limit the number of threads writing to Keep at once.
-
- This ensures that only a number of writer threads that could
- potentially achieve the desired replication level run at once.
- Once the desired replication level is achieved, queued threads
- are instructed not to run.
-
- Should be used in a "with" block.
- """
- def __init__(self, want_copies, max_service_replicas):
- self._started = 0
- self._want_copies = want_copies
- self._done = 0
- self._thread_failures = 0
- self._response = None
- self._start_lock = threading.Condition()
- if (not max_service_replicas) or (max_service_replicas >= want_copies):
- max_threads = 1
- else:
- max_threads = math.ceil(float(want_copies) / max_service_replicas)
- _logger.debug("Limiter max threads is %d", max_threads)
- self._todo_lock = threading.Semaphore(max_threads)
- self._done_lock = threading.Lock()
- self._thread_failures_lock = threading.Lock()
- self._local = threading.local()
-
- def __enter__(self):
- self._start_lock.acquire()
- if getattr(self._local, 'sequence', None) is not None:
- # If the calling thread has used set_sequence(N), then
- # we wait here until N other threads have started.
- while self._started < self._local.sequence:
- self._start_lock.wait()
- self._todo_lock.acquire()
- self._started += 1
- self._start_lock.notifyAll()
- self._start_lock.release()
- return self
-
- def __exit__(self, type, value, traceback):
- with self._thread_failures_lock:
- if self._thread_failures > 0:
- self._thread_failures -= 1
- self._todo_lock.release()
-
- # If work is finished, release al pending threads
- if not self.shall_i_proceed():
- self._todo_lock.release()
-
- def set_sequence(self, sequence):
- self._local.sequence = sequence
-
- def shall_i_proceed(self):
- """
- Return true if the current thread should write to Keep.
- Return false otherwise.
- """
- with self._done_lock:
- return (self._done < self._want_copies)
-
- def save_response(self, response_body, replicas_stored):
- """
- Records a response body (a locator, possibly signed) returned by
- the Keep server, and the number of replicas it stored.
- """
- if replicas_stored == 0:
- # Failure notification, should start a new thread to try to reach full replication
- with self._thread_failures_lock:
- self._thread_failures += 1
- else:
- with self._done_lock:
- self._done += replicas_stored
- self._response = response_body
-
- def response(self):
- """Return the body from the response to a PUT request."""
- with self._done_lock:
- return self._response
-
- def done(self):
- """Return the total number of replicas successfully stored."""
- with self._done_lock:
- return self._done
class KeepService(object):
"""Make requests to a single Keep service, and track results.
@@ -571,13 +487,15 @@ class KeepClient(object):
Queue.Queue.__init__(self) # Old-style superclass
self.wanted_copies = copies
self.successful_copies = 0
+ self.response = None
self.successful_copies_lock = threading.Lock()
self.retries = copies
self.retries_notification = threading.Condition()
- def write_success(self, replicas_nr):
+ def write_success(self, response, replicas_nr):
with self.successful_copies_lock:
self.successful_copies += replicas_nr
+ self.response = response
def write_fail(self, ks, status_code):
with self.retries_notification:
@@ -589,38 +507,47 @@ class KeepClient(object):
class KeepWriterThreadPool(object):
- def __init__(self, data, data_hash, copies=2, num_threads=None):
+ def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None):
+ self.total_task_nr = 0
self.wanted_copies = copies
- if num_threads is None:
- num_threads = copies
+ if (not max_service_replicas) or (max_service_replicas >= copies):
+ num_threads = 1
+ else:
+ num_threads = int(math.ceil(float(copies) / max_service_replicas))
+ _logger.debug("Pool max threads is %d", num_threads)
self.workers = []
self.queue = KeepClient.KeepWriterQueue(copies)
- # Start workers
+ # Create workers
for _ in range(num_threads):
- w = KeepClient.KeepWriterThreadNew(self.queue, data, data_hash)
+ w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
self.workers.append(w)
def add_task(self, ks, service_root):
self.queue.put((ks, service_root))
+ self.total_task_nr += 1
def done(self):
return self.queue.successful_copies
- def start(self):
+ def join(self):
+ # Start workers
for worker in self.workers:
worker.start()
-
- def join(self):
+ # Wait for finished work
self.queue.join()
with self.queue.retries_notification:
self.queue.retries_notification.notify_all()
for worker in self.workers:
worker.join()
+
+ def response(self):
+ return self.queue.response
- class KeepWriterThreadNew(threading.Thread):
- def __init__(self, queue, data, data_hash):
- super(KeepClient.KeepWriterThreadNew, self).__init__()
+ class KeepWriterThread(threading.Thread):
+ def __init__(self, queue, data, data_hash, timeout=None):
+ super(KeepClient.KeepWriterThread, self).__init__()
+ self.timeout = timeout
self.queue = queue
self.data = data
self.data_hash = data_hash
@@ -636,11 +563,16 @@ class KeepClient(object):
self.queue.retries -= 1
# Get to work
- service, service_root = self.queue.get()
-
+ try:
+ service, service_root = self.queue.get_nowait()
+ except Queue.Empty:
+ continue
+ if service.finished():
+ self.queue.task_done()
+ continue
success = bool(service.put(self.data_hash,
self.data,
- timeout=None))
+ timeout=self.timeout))
result = service.last_result()
if success:
_logger.debug("KeepWriterThread %s succeeded %s+%i %s",
@@ -652,7 +584,8 @@ class KeepClient(object):
replicas_stored = int(result['headers']['x-keep-replicas-stored'])
except (KeyError, ValueError):
replicas_stored = 1
- self.queue.write_success(replicas_stored)
+
+ self.queue.write_success(result['body'].strip(), replicas_stored)
else:
if result.get('status_code', None):
_logger.debug("Request fail: PUT %s => %s %s",
@@ -662,77 +595,14 @@ class KeepClient(object):
self.queue.write_fail(service, result.get('status_code', None)) # Schedule a retry
else:
# Remove the task from the queue anyways
- self.queue.get()
+ try:
+ self.queue.get_nowait()
+ except Queue.Empty:
+ continue
# Mark as done so the queue can be join()ed
self.queue.task_done()
- class KeepWriterThread(threading.Thread):
- """
- Write a blob of data to the given Keep server. On success, call
- save_response() of the given ThreadLimiter to save the returned
- locator.
- """
- def __init__(self, keep_service, **kwargs):
- super(KeepClient.KeepWriterThread, self).__init__()
- self.service = keep_service
- self.args = kwargs
- self._success = False
-
- def success(self):
- return self._success
-
- def run(self):
- limiter = self.args['thread_limiter']
- sequence = self.args['thread_sequence']
- if sequence is not None:
- limiter.set_sequence(sequence)
- with limiter:
- if not limiter.shall_i_proceed():
- # My turn arrived, but the job has been done without
- # me.
- return
- self.run_with_limiter(limiter)
-
- def run_with_limiter(self, limiter):
- if self.service.finished():
- return
- _logger.debug("KeepWriterThread %s proceeding %s+%i %s",
- str(threading.current_thread()),
- self.args['data_hash'],
- len(self.args['data']),
- self.args['service_root'])
- self._success = bool(self.service.put(
- self.args['data_hash'],
- self.args['data'],
- timeout=self.args.get('timeout', None)))
- result = self.service.last_result()
- if self._success:
- _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
- str(threading.current_thread()),
- self.args['data_hash'],
- len(self.args['data']),
- self.args['service_root'])
- # Tick the 'done' counter for the number of replica
- # reported stored by the server, for the case that
- # we're talking to a proxy or other backend that
- # stores to multiple copies for us.
- try:
- replicas_stored = int(result['headers']['x-keep-replicas-stored'])
- except (KeyError, ValueError):
- replicas_stored = 1
- limiter.save_response(result['body'].strip(), replicas_stored)
- elif result.get('status_code', None):
- _logger.debug("Request fail: PUT %s => %s %s",
- self.args['data_hash'],
- result['status_code'],
- result['body'])
- if not self._success:
- # Notify the failure so that the Thread limiter allows
- # a new one to run.
- limiter.save_response(None, 0)
-
-
def __init__(self, api_client=None, proxy=None,
timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
api_token=None, local_store=None, block_cache=None,
@@ -1169,30 +1039,22 @@ class KeepClient(object):
loop.save_result(error)
continue
- thread_limiter = KeepClient.ThreadLimiter(
- copies - done, self.max_replicas_per_service)
- threads = []
+ writer_pool = KeepClient.KeepWriterThreadPool(data=data,
+ data_hash=data_hash,
+ copies=copies - done,
+ max_service_replicas=self.max_replicas_per_service,
+ timeout=self.current_timeout(num_retries - tries_left))
for service_root, ks in [(root, roots_map[root])
for root in sorted_roots]:
if ks.finished():
continue
- t = KeepClient.KeepWriterThread(
- ks,
- data=data,
- data_hash=data_hash,
- service_root=service_root,
- thread_limiter=thread_limiter,
- timeout=self.current_timeout(num_retries-tries_left),
- thread_sequence=len(threads))
- t.start()
- threads.append(t)
- for t in threads:
- t.join()
- done += thread_limiter.done()
- loop.save_result((done >= copies, len(threads)))
+ writer_pool.add_task(ks, service_root)
+ writer_pool.join()
+ done += writer_pool.done()
+ loop.save_result((done >= copies, writer_pool.total_task_nr))
if loop.success():
- return thread_limiter.response()
+ return writer_pool.response()
if not roots_map:
raise arvados.errors.KeepWriteError(
"failed to write {}: no Keep services available ({})".format(
@@ -1203,7 +1065,7 @@ class KeepClient(object):
if roots_map[key].last_result()['error'])
raise arvados.errors.KeepWriteError(
"failed to write {} (wanted {} copies but wrote {})".format(
- data_hash, copies, thread_limiter.done()), service_errors, label="service")
+ data_hash, copies, writer_pool.done()), service_errors, label="service")
def local_store_put(self, data, copies=1, num_retries=None):
"""A stub for put().
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index 314b93d..3befc09 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -1065,7 +1065,7 @@ class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase):
self.check_exception(copies=2, num_retries=3)
-class KeepClientAvoidClientOverreplicationTestCaseNew(unittest.TestCase, tutil.ApiClientMock):
+class KeepClientAvoidClientOverreplicationTestCase(unittest.TestCase, tutil.ApiClientMock):
class FakeKeepService(object):
@@ -1075,6 +1075,7 @@ class KeepClientAvoidClientOverreplicationTestCaseNew(unittest.TestCase, tutil.A
self._result = {}
self._result['headers'] = {}
self._result['headers']['x-keep-replicas-stored'] = str(replicas)
+ self._result['body'] = 'foobar'
def put(self, data_hash, data, timeout):
time.sleep(self.delay)
@@ -1082,6 +1083,9 @@ class KeepClientAvoidClientOverreplicationTestCaseNew(unittest.TestCase, tutil.A
def last_result(self):
return self._result
+
+ def finished(self):
+ return False
def test_only_write_enough_on_success(self):
@@ -1089,12 +1093,12 @@ class KeepClientAvoidClientOverreplicationTestCaseNew(unittest.TestCase, tutil.A
pool = arvados.KeepClient.KeepWriterThreadPool(
data = 'foo',
data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
+ max_service_replicas = copies,
copies = copies
)
for i in range(10):
ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
pool.add_task(ks, None)
- pool.start()
pool.join()
self.assertEqual(pool.done(), copies)
@@ -1103,6 +1107,7 @@ class KeepClientAvoidClientOverreplicationTestCaseNew(unittest.TestCase, tutil.A
pool = arvados.KeepClient.KeepWriterThreadPool(
data = 'foo',
data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
+ max_service_replicas = copies,
copies = copies
)
for i in range(5):
@@ -1110,72 +1115,10 @@ class KeepClientAvoidClientOverreplicationTestCaseNew(unittest.TestCase, tutil.A
pool.add_task(ks, None)
ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
pool.add_task(ks, None)
- pool.start()
pool.join()
self.assertEqual(pool.done(), copies)
-class KeepClientAvoidClientOverreplicationTestCase(unittest.TestCase, tutil.ApiClientMock):
-
-
- class KeepFakeWriterThread(threading.Thread):
- """
- Just Simulating the real KeepClient.KeepWriterThread, to test the ThreadLimiter.
- """
- def __init__(self, delay, will_succeed, thread_limiter):
- super(KeepClientAvoidClientOverreplicationTestCase.KeepFakeWriterThread, self).__init__()
- self.delay = delay # in seconds
- self.success = will_succeed
- self.limiter = thread_limiter
-
- def run(self):
- with self.limiter:
- if not self.limiter.shall_i_proceed():
- return
- time.sleep(self.delay)
- if self.success:
- self.limiter.save_response('foo', 1)
- else:
- self.limiter.save_response(None, 0)
-
- def test_only_write_enough_on_success(self):
- copies = 3
- threads = []
- limiter = arvados.KeepClient.ThreadLimiter(want_copies=copies, max_service_replicas=1)
- # Setting up fake writer threads with different delays so that the bug is revealed
- for i in range(copies*2):
- t = self.KeepFakeWriterThread(
- delay=i/10.0,
- will_succeed=True,
- thread_limiter=limiter)
- t.start()
- threads.append(t)
- for t in threads:
- t.join()
- self.assertEqual(limiter.done(), copies)
-
- def test_only_write_enough_on_partial_failure(self):
- copies = 3
- threads = []
- limiter = arvados.KeepClient.ThreadLimiter(want_copies=copies, max_service_replicas=1)
- for i in range(copies):
- t = self.KeepFakeWriterThread(
- delay=i/10.0,
- will_succeed=False,
- thread_limiter=limiter)
- t.start()
- threads.append(t)
- t = self.KeepFakeWriterThread(
- delay=i/10.0,
- will_succeed=True,
- thread_limiter=limiter)
- t.start()
- threads.append(t)
- for t in threads:
- t.join()
- self.assertEqual(limiter.done(), copies)
-
-
@tutil.skip_sleep
class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock):
# Test put()s that need two distinct servers to succeed, possibly
commit 9ccdea630f00035e96eb2bac539a4bd0d3df196d
Author: Lucas Di Pentima <lucas at curoverse.com>
Date: Fri Jun 24 12:34:55 2016 -0300
Some tests done on new code
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 5d511e4..e0e7797 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -568,7 +568,7 @@ class KeepClient(object):
class KeepWriterQueue(Queue.Queue):
def __init__(self, copies):
- super(KeepClient.KeepWriterQueue, self).__init__()
+ Queue.Queue.__init__(self) # Old-style superclass
self.wanted_copies = copies
self.successful_copies = 0
self.successful_copies_lock = threading.Lock()
@@ -589,18 +589,21 @@ class KeepClient(object):
class KeepWriterThreadPool(object):
- def __init__(self, data, data_hash, num_threads, copies=2):
+ def __init__(self, data, data_hash, copies=2, num_threads=None):
self.wanted_copies = copies
+ if num_threads is None:
+ num_threads = copies
self.workers = []
self.queue = KeepClient.KeepWriterQueue(copies)
# Start workers
for _ in range(num_threads):
- self.workers.append(KeepClient.KeepWriterThreadNew(self.queue, data, data_hash))
+ w = KeepClient.KeepWriterThreadNew(self.queue, data, data_hash)
+ self.workers.append(w)
def add_task(self, ks, service_root):
self.queue.put((ks, service_root))
- def successful_copies(self):
+ def done(self):
return self.queue.successful_copies
def start(self):
@@ -621,7 +624,6 @@ class KeepClient(object):
self.queue = queue
self.data = data
self.data_hash = data_hash
- self.daemon = True
def run(self):
while not self.queue.empty():
@@ -636,9 +638,9 @@ class KeepClient(object):
# Get to work
service, service_root = self.queue.get()
- success = bool(self.service.put(self.data_hash,
- self.data,
- timeout=None))
+ success = bool(service.put(self.data_hash,
+ self.data,
+ timeout=None))
result = service.last_result()
if success:
_logger.debug("KeepWriterThread %s succeeded %s+%i %s",
@@ -1153,7 +1155,7 @@ class KeepClient(object):
headers = {}
# Tell the proxy how many copies we want it to store
- headers['X-Keep-Desired-Replication'] = str(copies)
+ headers['X-Keep-Desired-Replicas'] = str(copies)
roots_map = {}
loop = retry.RetryLoop(num_retries, self._check_loop_result,
backoff_start=2)
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index 977f05e..314b93d 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -1065,6 +1065,56 @@ class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase):
self.check_exception(copies=2, num_retries=3)
+class KeepClientAvoidClientOverreplicationTestCaseNew(unittest.TestCase, tutil.ApiClientMock):
+
+
+ class FakeKeepService(object):
+ def __init__(self, delay, will_succeed, replicas=1):
+ self.delay = delay
+ self.success = will_succeed
+ self._result = {}
+ self._result['headers'] = {}
+ self._result['headers']['x-keep-replicas-stored'] = str(replicas)
+
+ def put(self, data_hash, data, timeout):
+ time.sleep(self.delay)
+ return self.success
+
+ def last_result(self):
+ return self._result
+
+
+ def test_only_write_enough_on_success(self):
+ copies = 3
+ pool = arvados.KeepClient.KeepWriterThreadPool(
+ data = 'foo',
+ data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
+ copies = copies
+ )
+ for i in range(10):
+ ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
+ pool.add_task(ks, None)
+ pool.start()
+ pool.join()
+ self.assertEqual(pool.done(), copies)
+
+ def test_only_write_enough_on_partial_success(self):
+ copies = 3
+ pool = arvados.KeepClient.KeepWriterThreadPool(
+ data = 'foo',
+ data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
+ copies = copies
+ )
+ for i in range(5):
+ ks = self.FakeKeepService(delay=i/10.0, will_succeed=False)
+ pool.add_task(ks, None)
+ ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
+ pool.add_task(ks, None)
+ pool.start()
+ pool.join()
+ self.assertEqual(pool.done(), copies)
+
+
class KeepClientAvoidClientOverreplicationTestCase(unittest.TestCase, tutil.ApiClientMock):
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list