[ARVADOS] updated: 2.1.0-791-ga6419676c

Git user git at public.arvados.org
Mon May 31 20:11:29 UTC 2021


Summary of changes:
 sdk/python/arvados/keep.py           | 85 ++++++++++++++++++++++---------
 sdk/python/tests/test_keep_client.py | 99 +++++++++++++++++++++++++++++++++---
 2 files changed, 155 insertions(+), 29 deletions(-)

       via  a6419676c073a863232c4656f0602b2d038ec3cd (commit)
       via  864baf0627216bb82475ed0323b4107f47cd7fd5 (commit)
      from  4c8b0d8f326cfe65ad98d948f62f7478db099afa (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit a6419676c073a863232c4656f0602b2d038ec3cd
Author: Lucas Di Pentima <lucas.dipentima at curii.com>
Date:   Mon May 31 17:05:57 2021 -0300

    17465: Adds tests for class storage support.
    
    Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <lucas.dipentima at curii.com>

diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index b11578f4c..e3ef642c9 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -571,7 +571,7 @@ class KeepClient(object):
             self.response = None
             self.storage_classes_tracking = True
             self.queue_data_lock = threading.Lock()
-            self.pending_tries = max(copies, len(classes))
+            self.pending_tries = max(copies, len(classes))+1
             self.pending_tries_notification = threading.Condition()
 
         def write_success(self, response, replicas_nr, classes_confirmed):
@@ -608,7 +608,7 @@ class KeepClient(object):
 
         def pending_classes(self):
             with self.queue_data_lock:
-                if self.wanted_storage_classes is None:
+                if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
                     return []
                 unsatisfied_classes = copy.copy(self.wanted_storage_classes)
                 for st_class, st_copies in self.confirmed_storage_classes.items():
@@ -710,6 +710,7 @@ class KeepClient(object):
             classes = self.queue.pending_classes()
             headers = {}
             if len(classes) > 0:
+                classes.sort()
                 headers['X-Keep-Storage-Classes'] = ', '.join(classes)
             success = bool(service.put(self.data_hash,
                                         self.data,
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index cae2308f6..cdc492b51 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -532,6 +532,92 @@ class KeepClientCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
         # First reponse was not cached because it was from a HEAD request.
         self.assertNotEqual(head_resp, get_resp)
 
+ at tutil.skip_sleep
+class KeepStorageClassesTestCase(unittest.TestCase, tutil.ApiClientMock):
+    def setUp(self):
+        self.api_client = self.mock_keep_services(count=2)
+        self.keep_client = arvados.KeepClient(api_client=self.api_client)
+        self.data = b'xyzzy'
+        self.locator = '1271ed5ef305aadabc605b1609e24c52'
+
+    def test_storage_classes_req_header(self):
+        cases = [
+            # requested, expected
+            [['foo'], 'X-Keep-Storage-Classes: foo'],
+            [['bar', 'foo'], 'X-Keep-Storage-Classes: bar, foo'],
+            [[], None],
+        ]
+        for req_classes, expected_header in cases:
+            headers = {'x-keep-replicas-stored': 1}
+            if len(req_classes) > 0:
+                confirmed_hdr = ', '.join(["{}=1".format(cls) for cls in req_classes])
+                headers.update({'x-keep-storage-classes-confirmed': confirmed_hdr})
+            with tutil.mock_keep_responses(self.locator, 200, **headers) as mock:
+                self.keep_client.put(self.data, copies=1, classes=req_classes)
+                resp = mock.responses[0]
+                if expected_header is not None:
+                    self.assertIn(expected_header, resp.getopt(pycurl.HTTPHEADER))
+                else:
+                    for hdr in resp.getopt(pycurl.HTTPHEADER):
+                        self.assertNotRegex(hdr, r'^X-Keep-Storage-Classes.*')
+
+    def test_partial_storage_classes_put(self):
+        headers = {
+            'x-keep-replicas-stored': 1,
+            'x-keep-storage-classes-confirmed': 'foo=1'}
+        with tutil.mock_keep_responses(self.locator, 200, 503, **headers) as mock:
+            with self.assertRaises(arvados.errors.KeepWriteError):
+                self.keep_client.put(self.data, copies=1, classes=['foo', 'bar'])
+            # 1st request, both classes pending
+            req1_headers = mock.responses[0].getopt(pycurl.HTTPHEADER)
+            self.assertIn('X-Keep-Storage-Classes: bar, foo', req1_headers)
+            # 2nd try, 'foo' class already satisfied
+            req2_headers = mock.responses[1].getopt(pycurl.HTTPHEADER)
+            self.assertIn('X-Keep-Storage-Classes: bar', req2_headers)
+
+    def test_successful_storage_classes_put_requests(self):
+        cases = [
+            # wanted_copies, wanted_classes, confirmed_copies, confirmed_classes, expected_requests
+            [ 1, ['foo'], 1, 'foo=1', 1],
+            [ 1, ['foo'], 2, 'foo=2', 1],
+            [ 2, ['foo'], 2, 'foo=2', 1],
+            [ 2, ['foo'], 1, 'foo=1', 2],
+            [ 1, ['foo', 'bar'], 1, 'foo=1, bar=1', 1],
+            [ 1, ['foo', 'bar'], 2, 'foo=2, bar=2', 1],
+            [ 2, ['foo', 'bar'], 2, 'foo=2, bar=2', 1],
+            [ 2, ['foo', 'bar'], 1, 'foo=1, bar=1', 2],
+            [ 1, ['foo', 'bar'], 1, None, 1],
+            [ 1, ['foo'], 1, None, 1],
+            [ 2, ['foo'], 2, None, 1],
+            [ 2, ['foo'], 1, None, 2],
+        ]
+        for w_copies, w_classes, c_copies, c_classes, e_reqs in cases:
+            headers = {'x-keep-replicas-stored': c_copies}
+            if c_classes is not None:
+                headers.update({'x-keep-storage-classes-confirmed': c_classes})
+            with tutil.mock_keep_responses(self.locator, 200, 200, **headers) as mock:
+                case_desc = 'wanted_copies={}, wanted_classes="{}", confirmed_copies={}, confirmed_classes="{}", expected_requests={}'.format(w_copies, ', '.join(w_classes), c_copies, c_classes, e_reqs)
+                self.assertEqual(self.locator,
+                    self.keep_client.put(self.data, copies=w_copies, classes=w_classes),
+                    case_desc)
+                self.assertEqual(e_reqs, mock.call_count, case_desc)
+
+    def test_failed_storage_classes_put_requests(self):
+        cases = [
+            # wanted_copies, wanted_classes, confirmed_copies, confirmed_classes, return_code
+            [ 1, ['foo'], 1, 'bar=1', 200],
+            [ 1, ['foo'], 1, None, 503],
+            [ 2, ['foo'], 1, 'bar=1, foo=1', 200],
+            [ 2, ['foo, bar'], 1, 'bar=2, foo=1', 200],
+        ]
+        for w_copies, w_classes, c_copies, c_classes, return_code in cases:
+            headers = {'x-keep-replicas-stored': c_copies}
+            if c_classes is not None:
+                headers.update({'x-keep-storage-classes-confirmed': c_classes})
+            with tutil.mock_keep_responses(self.locator, return_code, **headers):
+                case_desc = 'wanted_copies={}, wanted_classes="{}", confirmed_copies={}, confirmed_classes="{}"'.format(w_copies, ', '.join(w_classes), c_copies, c_classes)
+                with self.assertRaises(arvados.errors.KeepWriteError, msg=case_desc):
+                    self.keep_client.put(self.data, copies=w_copies, classes=w_classes)
 
 @tutil.skip_sleep
 class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock):

commit 864baf0627216bb82475ed0323b4107f47cd7fd5
Author: Lucas Di Pentima <lucas.dipentima at curii.com>
Date:   Thu May 27 18:38:30 2021 -0300

    17465: Adds storage classes tracking and old cluster support.
    
    * Tracks which classes were confirmed and ask for the remaining on following
      retries.
    * When receiving a keepstore response without the proper header, it fallbacks
      to only saving the requested number of copies.
    * Updates tests to reflect code changes.
    
    Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <lucas.dipentima at curii.com>

diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index eac25e9d3..b11578f4c 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -4,6 +4,7 @@
 
 from __future__ import absolute_import
 from __future__ import division
+import copy
 from future import standard_library
 from future.utils import native_str
 standard_library.install_aliases()
@@ -446,7 +447,9 @@ class KeepClient(object):
                 return None
             return self._result['body']
 
-        def put(self, hash_s, body, timeout=None):
+        def put(self, hash_s, body, timeout=None, headers={}):
+            put_headers = copy.copy(self.put_headers)
+            put_headers.update(headers)
             url = self.root + hash_s
             _logger.debug("Request: PUT %s", url)
             curl = self._get_user_agent()
@@ -470,7 +473,7 @@ class KeepClient(object):
                     curl.setopt(pycurl.INFILESIZE, len(body))
                     curl.setopt(pycurl.READFUNCTION, body_reader.read)
                     curl.setopt(pycurl.HTTPHEADER, [
-                        '{}: {}'.format(k,v) for k,v in self.put_headers.items()])
+                        '{}: {}'.format(k,v) for k,v in put_headers.items()])
                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
                     if self.insecure:
@@ -566,18 +569,22 @@ class KeepClient(object):
             self.successful_copies = 0
             self.confirmed_storage_classes = {}
             self.response = None
+            self.storage_classes_tracking = True
             self.queue_data_lock = threading.Lock()
-            self.pending_tries = copies
+            self.pending_tries = max(copies, len(classes))
             self.pending_tries_notification = threading.Condition()
 
         def write_success(self, response, replicas_nr, classes_confirmed):
             with self.queue_data_lock:
                 self.successful_copies += replicas_nr
-                for st_class, st_copies in classes_confirmed.items():
-                    try:
-                        self.confirmed_storage_classes[st_class] += st_copies
-                    except KeyError:
-                        self.confirmed_storage_classes[st_class] = st_copies
+                if classes_confirmed is None:
+                    self.storage_classes_tracking = False
+                elif self.storage_classes_tracking:
+                    for st_class, st_copies in classes_confirmed.items():
+                        try:
+                            self.confirmed_storage_classes[st_class] += st_copies
+                        except KeyError:
+                            self.confirmed_storage_classes[st_class] = st_copies
                 self.response = response
             with self.pending_tries_notification:
                 self.pending_tries_notification.notify_all()
@@ -591,12 +598,22 @@ class KeepClient(object):
             with self.queue_data_lock:
                 return self.wanted_copies - self.successful_copies
 
+        def satisfied_classes(self):
+            with self.queue_data_lock:
+                if not self.storage_classes_tracking:
+                    # Notifies disabled storage classes expectation to
+                    # the outer loop.
+                    return None
+            return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
+
         def pending_classes(self):
             with self.queue_data_lock:
-                unsatisfied_classes = []
+                if self.wanted_storage_classes is None:
+                    return []
+                unsatisfied_classes = copy.copy(self.wanted_storage_classes)
                 for st_class, st_copies in self.confirmed_storage_classes.items():
-                    if st_class in self.wanted_storage_classes and st_copies < self.wanted_copies:
-                        unsatisfied_classes.append(st_class)
+                    if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
+                        unsatisfied_classes.remove(st_class)
                 return unsatisfied_classes
 
         def get_next_task(self):
@@ -614,7 +631,7 @@ class KeepClient(object):
                         while True:
                             self.get_nowait()
                             self.task_done()
-                    elif self.pending_tries > 0 or len(self.pending_classes()) > 0:
+                    elif self.pending_tries > 0:
                         service, service_root = self.get_nowait()
                         if service.finished():
                             self.task_done()
@@ -648,7 +665,7 @@ class KeepClient(object):
             self.total_task_nr += 1
 
         def done(self):
-            return self.queue.successful_copies
+            return self.queue.successful_copies, self.queue.satisfied_classes()
 
         def join(self):
             # Start workers
@@ -690,9 +707,14 @@ class KeepClient(object):
                     self.queue.task_done()
 
         def do_task(self, service, service_root):
+            classes = self.queue.pending_classes()
+            headers = {}
+            if len(classes) > 0:
+                headers['X-Keep-Storage-Classes'] = ', '.join(classes)
             success = bool(service.put(self.data_hash,
                                         self.data,
-                                        timeout=self.timeout))
+                                        timeout=self.timeout,
+                                        headers=headers))
             result = service.last_result()
 
             if not success:
@@ -722,7 +744,7 @@ class KeepClient(object):
                         classes_confirmed[stored_class] = int(stored_copies)
             except (KeyError, ValueError):
                 # Storage classes confirmed header missing or corrupt
-                classes_confirmed = {}
+                classes_confirmed = None
 
             return result['body'].strip(), replicas_stored, classes_confirmed
 
@@ -1187,12 +1209,11 @@ class KeepClient(object):
                              arvados.util.new_request_id()),
             'X-Keep-Desired-Replicas': str(copies),
         }
-        if len(classes) > 0:
-            headers['X-Keep-Storage-Classes'] = ', '.join(classes)
         roots_map = {}
         loop = retry.RetryLoop(num_retries, self._check_loop_result,
                                backoff_start=2)
-        done = 0
+        done_copies = 0
+        done_classes = []
         for tries_left in loop:
             try:
                 sorted_roots = self.map_new_services(
@@ -1204,20 +1225,37 @@ class KeepClient(object):
                 loop.save_result(error)
                 continue
 
+            pending_classes = []
+            if done_classes is not None:
+                pending_classes = list(set(classes) - set(done_classes))
             writer_pool = KeepClient.KeepWriterThreadPool(data=data,
                                                         data_hash=data_hash,
-                                                        copies=copies - done,
+                                                        copies=copies - done_copies,
                                                         max_service_replicas=self.max_replicas_per_service,
                                                         timeout=self.current_timeout(num_retries - tries_left),
-                                                        classes=classes)
+                                                        classes=pending_classes)
             for service_root, ks in [(root, roots_map[root])
                                      for root in sorted_roots]:
                 if ks.finished():
                     continue
                 writer_pool.add_task(ks, service_root)
             writer_pool.join()
-            done += writer_pool.done()
-            loop.save_result((done >= copies, writer_pool.total_task_nr))
+            pool_copies, pool_classes = writer_pool.done()
+            done_copies += pool_copies
+            if (done_classes is not None) and (pool_classes is not None):
+                done_classes += pool_classes
+                loop.save_result(
+                    (done_copies >= copies and set(done_classes) == set(classes),
+                    writer_pool.total_task_nr))
+            else:
+                # Old keepstore contacted without storage classes support:
+                # success is determined only by successful copies.
+                #
+                # Disable storage classes tracking from this point forward.
+                _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
+                done_classes = None
+                loop.save_result(
+                    (done_copies >= copies, writer_pool.total_task_nr))
 
         if loop.success():
             return writer_pool.response()
@@ -1231,7 +1269,7 @@ class KeepClient(object):
                               if roots_map[key].last_result()['error'])
             raise arvados.errors.KeepWriteError(
                 "failed to write {} after {} (wanted {} copies but wrote {})".format(
-                    data_hash, loop.attempts_str(), copies, writer_pool.done()), service_errors, label="service")
+                    data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
 
     def local_store_put(self, data, copies=1, num_retries=None):
         """A stub for put().
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index 27e3cf633..cae2308f6 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -70,7 +70,7 @@ class KeepTestCase(run_test_server.TestCaseWithServers):
 
     def test_KeepLongBinaryRWTest(self):
         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
-        for i in range(0,23):
+        for i in range(0, 23):
             blob_data = blob_data + blob_data
         blob_locator = self.keep_client.put(blob_data)
         self.assertRegex(
@@ -1178,9 +1178,10 @@ class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
             self._result = {}
             self._result['headers'] = {}
             self._result['headers']['x-keep-replicas-stored'] = str(replicas)
+            self._result['headers']['x-keep-storage-classes-confirmed'] = 'default={}'.format(replicas)
             self._result['body'] = 'foobar'
 
-        def put(self, data_hash, data, timeout):
+        def put(self, data_hash, data, timeout, headers):
             time.sleep(self.delay)
             if self.will_raise is not None:
                 raise self.will_raise
@@ -1207,7 +1208,7 @@ class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
             self.pool.add_task(ks, None)
         self.pool.join()
-        self.assertEqual(self.pool.done(), self.copies)
+        self.assertEqual(self.pool.done(), (self.copies, []))
 
     def test_only_write_enough_on_partial_success(self):
         for i in range(5):
@@ -1216,7 +1217,7 @@ class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
             self.pool.add_task(ks, None)
         self.pool.join()
-        self.assertEqual(self.pool.done(), self.copies)
+        self.assertEqual(self.pool.done(), (self.copies, []))
 
     def test_only_write_enough_when_some_crash(self):
         for i in range(5):
@@ -1225,7 +1226,7 @@ class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
             self.pool.add_task(ks, None)
         self.pool.join()
-        self.assertEqual(self.pool.done(), self.copies)
+        self.assertEqual(self.pool.done(), (self.copies, []))
 
     def test_fail_when_too_many_crash(self):
         for i in range(self.copies+1):
@@ -1235,7 +1236,7 @@ class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
             self.pool.add_task(ks, None)
         self.pool.join()
-        self.assertEqual(self.pool.done(), self.copies-1)
+        self.assertEqual(self.pool.done(), (self.copies-1, []))
 
 
 @tutil.skip_sleep

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list