[ARVADOS] updated: 2.1.0-791-ga6419676c
Git user
git at public.arvados.org
Mon May 31 20:11:29 UTC 2021
Summary of changes:
sdk/python/arvados/keep.py | 85 ++++++++++++++++++++++---------
sdk/python/tests/test_keep_client.py | 99 +++++++++++++++++++++++++++++++++---
2 files changed, 155 insertions(+), 29 deletions(-)
via a6419676c073a863232c4656f0602b2d038ec3cd (commit)
via 864baf0627216bb82475ed0323b4107f47cd7fd5 (commit)
from 4c8b0d8f326cfe65ad98d948f62f7478db099afa (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit a6419676c073a863232c4656f0602b2d038ec3cd
Author: Lucas Di Pentima <lucas.dipentima at curii.com>
Date: Mon May 31 17:05:57 2021 -0300
17465: Adds tests for class storage support.
Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <lucas.dipentima at curii.com>
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index b11578f4c..e3ef642c9 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -571,7 +571,7 @@ class KeepClient(object):
self.response = None
self.storage_classes_tracking = True
self.queue_data_lock = threading.Lock()
- self.pending_tries = max(copies, len(classes))
+ self.pending_tries = max(copies, len(classes))+1
self.pending_tries_notification = threading.Condition()
def write_success(self, response, replicas_nr, classes_confirmed):
@@ -608,7 +608,7 @@ class KeepClient(object):
def pending_classes(self):
with self.queue_data_lock:
- if self.wanted_storage_classes is None:
+ if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
return []
unsatisfied_classes = copy.copy(self.wanted_storage_classes)
for st_class, st_copies in self.confirmed_storage_classes.items():
@@ -710,6 +710,7 @@ class KeepClient(object):
classes = self.queue.pending_classes()
headers = {}
if len(classes) > 0:
+ classes.sort()
headers['X-Keep-Storage-Classes'] = ', '.join(classes)
success = bool(service.put(self.data_hash,
self.data,
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index cae2308f6..cdc492b51 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -532,6 +532,92 @@ class KeepClientCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
# First reponse was not cached because it was from a HEAD request.
self.assertNotEqual(head_resp, get_resp)
+ at tutil.skip_sleep
+class KeepStorageClassesTestCase(unittest.TestCase, tutil.ApiClientMock):
+ def setUp(self):
+ self.api_client = self.mock_keep_services(count=2)
+ self.keep_client = arvados.KeepClient(api_client=self.api_client)
+ self.data = b'xyzzy'
+ self.locator = '1271ed5ef305aadabc605b1609e24c52'
+
+ def test_storage_classes_req_header(self):
+ cases = [
+ # requested, expected
+ [['foo'], 'X-Keep-Storage-Classes: foo'],
+ [['bar', 'foo'], 'X-Keep-Storage-Classes: bar, foo'],
+ [[], None],
+ ]
+ for req_classes, expected_header in cases:
+ headers = {'x-keep-replicas-stored': 1}
+ if len(req_classes) > 0:
+ confirmed_hdr = ', '.join(["{}=1".format(cls) for cls in req_classes])
+ headers.update({'x-keep-storage-classes-confirmed': confirmed_hdr})
+ with tutil.mock_keep_responses(self.locator, 200, **headers) as mock:
+ self.keep_client.put(self.data, copies=1, classes=req_classes)
+ resp = mock.responses[0]
+ if expected_header is not None:
+ self.assertIn(expected_header, resp.getopt(pycurl.HTTPHEADER))
+ else:
+ for hdr in resp.getopt(pycurl.HTTPHEADER):
+ self.assertNotRegex(hdr, r'^X-Keep-Storage-Classes.*')
+
+ def test_partial_storage_classes_put(self):
+ headers = {
+ 'x-keep-replicas-stored': 1,
+ 'x-keep-storage-classes-confirmed': 'foo=1'}
+ with tutil.mock_keep_responses(self.locator, 200, 503, **headers) as mock:
+ with self.assertRaises(arvados.errors.KeepWriteError):
+ self.keep_client.put(self.data, copies=1, classes=['foo', 'bar'])
+ # 1st request, both classes pending
+ req1_headers = mock.responses[0].getopt(pycurl.HTTPHEADER)
+ self.assertIn('X-Keep-Storage-Classes: bar, foo', req1_headers)
+ # 2nd try, 'foo' class already satisfied
+ req2_headers = mock.responses[1].getopt(pycurl.HTTPHEADER)
+ self.assertIn('X-Keep-Storage-Classes: bar', req2_headers)
+
+ def test_successful_storage_classes_put_requests(self):
+ cases = [
+ # wanted_copies, wanted_classes, confirmed_copies, confirmed_classes, expected_requests
+ [ 1, ['foo'], 1, 'foo=1', 1],
+ [ 1, ['foo'], 2, 'foo=2', 1],
+ [ 2, ['foo'], 2, 'foo=2', 1],
+ [ 2, ['foo'], 1, 'foo=1', 2],
+ [ 1, ['foo', 'bar'], 1, 'foo=1, bar=1', 1],
+ [ 1, ['foo', 'bar'], 2, 'foo=2, bar=2', 1],
+ [ 2, ['foo', 'bar'], 2, 'foo=2, bar=2', 1],
+ [ 2, ['foo', 'bar'], 1, 'foo=1, bar=1', 2],
+ [ 1, ['foo', 'bar'], 1, None, 1],
+ [ 1, ['foo'], 1, None, 1],
+ [ 2, ['foo'], 2, None, 1],
+ [ 2, ['foo'], 1, None, 2],
+ ]
+ for w_copies, w_classes, c_copies, c_classes, e_reqs in cases:
+ headers = {'x-keep-replicas-stored': c_copies}
+ if c_classes is not None:
+ headers.update({'x-keep-storage-classes-confirmed': c_classes})
+ with tutil.mock_keep_responses(self.locator, 200, 200, **headers) as mock:
+ case_desc = 'wanted_copies={}, wanted_classes="{}", confirmed_copies={}, confirmed_classes="{}", expected_requests={}'.format(w_copies, ', '.join(w_classes), c_copies, c_classes, e_reqs)
+ self.assertEqual(self.locator,
+ self.keep_client.put(self.data, copies=w_copies, classes=w_classes),
+ case_desc)
+ self.assertEqual(e_reqs, mock.call_count, case_desc)
+
+ def test_failed_storage_classes_put_requests(self):
+ cases = [
+ # wanted_copies, wanted_classes, confirmed_copies, confirmed_classes, return_code
+ [ 1, ['foo'], 1, 'bar=1', 200],
+ [ 1, ['foo'], 1, None, 503],
+ [ 2, ['foo'], 1, 'bar=1, foo=1', 200],
+ [ 2, ['foo, bar'], 1, 'bar=2, foo=1', 200],
+ ]
+ for w_copies, w_classes, c_copies, c_classes, return_code in cases:
+ headers = {'x-keep-replicas-stored': c_copies}
+ if c_classes is not None:
+ headers.update({'x-keep-storage-classes-confirmed': c_classes})
+ with tutil.mock_keep_responses(self.locator, return_code, **headers):
+ case_desc = 'wanted_copies={}, wanted_classes="{}", confirmed_copies={}, confirmed_classes="{}"'.format(w_copies, ', '.join(w_classes), c_copies, c_classes)
+ with self.assertRaises(arvados.errors.KeepWriteError, msg=case_desc):
+ self.keep_client.put(self.data, copies=w_copies, classes=w_classes)
@tutil.skip_sleep
class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock):
commit 864baf0627216bb82475ed0323b4107f47cd7fd5
Author: Lucas Di Pentima <lucas.dipentima at curii.com>
Date: Thu May 27 18:38:30 2021 -0300
17465: Adds storage classes tracking and old cluster support.
* Tracks which classes were confirmed and ask for the remaining on following
retries.
* When receiving a keepstore response without the proper header, it fallbacks
to only saving the requested number of copies.
* Updates tests to reflect code changes.
Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <lucas.dipentima at curii.com>
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index eac25e9d3..b11578f4c 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -4,6 +4,7 @@
from __future__ import absolute_import
from __future__ import division
+import copy
from future import standard_library
from future.utils import native_str
standard_library.install_aliases()
@@ -446,7 +447,9 @@ class KeepClient(object):
return None
return self._result['body']
- def put(self, hash_s, body, timeout=None):
+ def put(self, hash_s, body, timeout=None, headers={}):
+ put_headers = copy.copy(self.put_headers)
+ put_headers.update(headers)
url = self.root + hash_s
_logger.debug("Request: PUT %s", url)
curl = self._get_user_agent()
@@ -470,7 +473,7 @@ class KeepClient(object):
curl.setopt(pycurl.INFILESIZE, len(body))
curl.setopt(pycurl.READFUNCTION, body_reader.read)
curl.setopt(pycurl.HTTPHEADER, [
- '{}: {}'.format(k,v) for k,v in self.put_headers.items()])
+ '{}: {}'.format(k,v) for k,v in put_headers.items()])
curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
if self.insecure:
@@ -566,18 +569,22 @@ class KeepClient(object):
self.successful_copies = 0
self.confirmed_storage_classes = {}
self.response = None
+ self.storage_classes_tracking = True
self.queue_data_lock = threading.Lock()
- self.pending_tries = copies
+ self.pending_tries = max(copies, len(classes))
self.pending_tries_notification = threading.Condition()
def write_success(self, response, replicas_nr, classes_confirmed):
with self.queue_data_lock:
self.successful_copies += replicas_nr
- for st_class, st_copies in classes_confirmed.items():
- try:
- self.confirmed_storage_classes[st_class] += st_copies
- except KeyError:
- self.confirmed_storage_classes[st_class] = st_copies
+ if classes_confirmed is None:
+ self.storage_classes_tracking = False
+ elif self.storage_classes_tracking:
+ for st_class, st_copies in classes_confirmed.items():
+ try:
+ self.confirmed_storage_classes[st_class] += st_copies
+ except KeyError:
+ self.confirmed_storage_classes[st_class] = st_copies
self.response = response
with self.pending_tries_notification:
self.pending_tries_notification.notify_all()
@@ -591,12 +598,22 @@ class KeepClient(object):
with self.queue_data_lock:
return self.wanted_copies - self.successful_copies
+ def satisfied_classes(self):
+ with self.queue_data_lock:
+ if not self.storage_classes_tracking:
+ # Notifies disabled storage classes expectation to
+ # the outer loop.
+ return None
+ return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
+
def pending_classes(self):
with self.queue_data_lock:
- unsatisfied_classes = []
+ if self.wanted_storage_classes is None:
+ return []
+ unsatisfied_classes = copy.copy(self.wanted_storage_classes)
for st_class, st_copies in self.confirmed_storage_classes.items():
- if st_class in self.wanted_storage_classes and st_copies < self.wanted_copies:
- unsatisfied_classes.append(st_class)
+ if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
+ unsatisfied_classes.remove(st_class)
return unsatisfied_classes
def get_next_task(self):
@@ -614,7 +631,7 @@ class KeepClient(object):
while True:
self.get_nowait()
self.task_done()
- elif self.pending_tries > 0 or len(self.pending_classes()) > 0:
+ elif self.pending_tries > 0:
service, service_root = self.get_nowait()
if service.finished():
self.task_done()
@@ -648,7 +665,7 @@ class KeepClient(object):
self.total_task_nr += 1
def done(self):
- return self.queue.successful_copies
+ return self.queue.successful_copies, self.queue.satisfied_classes()
def join(self):
# Start workers
@@ -690,9 +707,14 @@ class KeepClient(object):
self.queue.task_done()
def do_task(self, service, service_root):
+ classes = self.queue.pending_classes()
+ headers = {}
+ if len(classes) > 0:
+ headers['X-Keep-Storage-Classes'] = ', '.join(classes)
success = bool(service.put(self.data_hash,
self.data,
- timeout=self.timeout))
+ timeout=self.timeout,
+ headers=headers))
result = service.last_result()
if not success:
@@ -722,7 +744,7 @@ class KeepClient(object):
classes_confirmed[stored_class] = int(stored_copies)
except (KeyError, ValueError):
# Storage classes confirmed header missing or corrupt
- classes_confirmed = {}
+ classes_confirmed = None
return result['body'].strip(), replicas_stored, classes_confirmed
@@ -1187,12 +1209,11 @@ class KeepClient(object):
arvados.util.new_request_id()),
'X-Keep-Desired-Replicas': str(copies),
}
- if len(classes) > 0:
- headers['X-Keep-Storage-Classes'] = ', '.join(classes)
roots_map = {}
loop = retry.RetryLoop(num_retries, self._check_loop_result,
backoff_start=2)
- done = 0
+ done_copies = 0
+ done_classes = []
for tries_left in loop:
try:
sorted_roots = self.map_new_services(
@@ -1204,20 +1225,37 @@ class KeepClient(object):
loop.save_result(error)
continue
+ pending_classes = []
+ if done_classes is not None:
+ pending_classes = list(set(classes) - set(done_classes))
writer_pool = KeepClient.KeepWriterThreadPool(data=data,
data_hash=data_hash,
- copies=copies - done,
+ copies=copies - done_copies,
max_service_replicas=self.max_replicas_per_service,
timeout=self.current_timeout(num_retries - tries_left),
- classes=classes)
+ classes=pending_classes)
for service_root, ks in [(root, roots_map[root])
for root in sorted_roots]:
if ks.finished():
continue
writer_pool.add_task(ks, service_root)
writer_pool.join()
- done += writer_pool.done()
- loop.save_result((done >= copies, writer_pool.total_task_nr))
+ pool_copies, pool_classes = writer_pool.done()
+ done_copies += pool_copies
+ if (done_classes is not None) and (pool_classes is not None):
+ done_classes += pool_classes
+ loop.save_result(
+ (done_copies >= copies and set(done_classes) == set(classes),
+ writer_pool.total_task_nr))
+ else:
+ # Old keepstore contacted without storage classes support:
+ # success is determined only by successful copies.
+ #
+ # Disable storage classes tracking from this point forward.
+ _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
+ done_classes = None
+ loop.save_result(
+ (done_copies >= copies, writer_pool.total_task_nr))
if loop.success():
return writer_pool.response()
@@ -1231,7 +1269,7 @@ class KeepClient(object):
if roots_map[key].last_result()['error'])
raise arvados.errors.KeepWriteError(
"failed to write {} after {} (wanted {} copies but wrote {})".format(
- data_hash, loop.attempts_str(), copies, writer_pool.done()), service_errors, label="service")
+ data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
def local_store_put(self, data, copies=1, num_retries=None):
"""A stub for put().
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index 27e3cf633..cae2308f6 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -70,7 +70,7 @@ class KeepTestCase(run_test_server.TestCaseWithServers):
def test_KeepLongBinaryRWTest(self):
blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
- for i in range(0,23):
+ for i in range(0, 23):
blob_data = blob_data + blob_data
blob_locator = self.keep_client.put(blob_data)
self.assertRegex(
@@ -1178,9 +1178,10 @@ class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
self._result = {}
self._result['headers'] = {}
self._result['headers']['x-keep-replicas-stored'] = str(replicas)
+ self._result['headers']['x-keep-storage-classes-confirmed'] = 'default={}'.format(replicas)
self._result['body'] = 'foobar'
- def put(self, data_hash, data, timeout):
+ def put(self, data_hash, data, timeout, headers):
time.sleep(self.delay)
if self.will_raise is not None:
raise self.will_raise
@@ -1207,7 +1208,7 @@ class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
self.pool.add_task(ks, None)
self.pool.join()
- self.assertEqual(self.pool.done(), self.copies)
+ self.assertEqual(self.pool.done(), (self.copies, []))
def test_only_write_enough_on_partial_success(self):
for i in range(5):
@@ -1216,7 +1217,7 @@ class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
self.pool.add_task(ks, None)
self.pool.join()
- self.assertEqual(self.pool.done(), self.copies)
+ self.assertEqual(self.pool.done(), (self.copies, []))
def test_only_write_enough_when_some_crash(self):
for i in range(5):
@@ -1225,7 +1226,7 @@ class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
self.pool.add_task(ks, None)
self.pool.join()
- self.assertEqual(self.pool.done(), self.copies)
+ self.assertEqual(self.pool.done(), (self.copies, []))
def test_fail_when_too_many_crash(self):
for i in range(self.copies+1):
@@ -1235,7 +1236,7 @@ class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
self.pool.add_task(ks, None)
self.pool.join()
- self.assertEqual(self.pool.done(), self.copies-1)
+ self.assertEqual(self.pool.done(), (self.copies-1, []))
@tutil.skip_sleep
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list