[ARVADOS] created: 2.1.0-789-g4c8b0d8f3

Git user git at public.arvados.org
Fri May 21 00:07:35 UTC 2021


        at  4c8b0d8f326cfe65ad98d948f62f7478db099afa (commit)


commit 4c8b0d8f326cfe65ad98d948f62f7478db099afa
Author: Lucas Di Pentima <lucas.dipentima at curii.com>
Date:   Thu May 20 21:06:43 2021 -0300

    17465: Adds storage classes support to PySDK put().
    
    Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <lucas.dipentima at curii.com>

diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index bd0e5dc1e..eac25e9d3 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -559,18 +559,25 @@ class KeepClient(object):
 
 
     class KeepWriterQueue(queue.Queue):
-        def __init__(self, copies):
+        def __init__(self, copies, classes=[]):
             queue.Queue.__init__(self) # Old-style superclass
             self.wanted_copies = copies
+            self.wanted_storage_classes = classes
             self.successful_copies = 0
+            self.confirmed_storage_classes = {}
             self.response = None
-            self.successful_copies_lock = threading.Lock()
+            self.queue_data_lock = threading.Lock()
             self.pending_tries = copies
             self.pending_tries_notification = threading.Condition()
 
-        def write_success(self, response, replicas_nr):
-            with self.successful_copies_lock:
+        def write_success(self, response, replicas_nr, classes_confirmed):
+            with self.queue_data_lock:
                 self.successful_copies += replicas_nr
+                for st_class, st_copies in classes_confirmed.items():
+                    try:
+                        self.confirmed_storage_classes[st_class] += st_copies
+                    except KeyError:
+                        self.confirmed_storage_classes[st_class] = st_copies
                 self.response = response
             with self.pending_tries_notification:
                 self.pending_tries_notification.notify_all()
@@ -581,13 +588,21 @@ class KeepClient(object):
                 self.pending_tries_notification.notify()
 
         def pending_copies(self):
-            with self.successful_copies_lock:
+            with self.queue_data_lock:
                 return self.wanted_copies - self.successful_copies
 
+        def pending_classes(self):
+            with self.queue_data_lock:
+                unsatisfied_classes = []
+                for st_class, st_copies in self.confirmed_storage_classes.items():
+                    if st_class in self.wanted_storage_classes and st_copies < self.wanted_copies:
+                        unsatisfied_classes.append(st_class)
+                return unsatisfied_classes
+
         def get_next_task(self):
             with self.pending_tries_notification:
                 while True:
-                    if self.pending_copies() < 1:
+                    if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
                         # This notify_all() is unnecessary --
                         # write_success() already called notify_all()
                         # when pending<1 became true, so it's not
@@ -599,7 +614,7 @@ class KeepClient(object):
                         while True:
                             self.get_nowait()
                             self.task_done()
-                    elif self.pending_tries > 0:
+                    elif self.pending_tries > 0 or len(self.pending_classes()) > 0:
                         service, service_root = self.get_nowait()
                         if service.finished():
                             self.task_done()
@@ -614,16 +629,15 @@ class KeepClient(object):
 
 
     class KeepWriterThreadPool(object):
-        def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None):
+        def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
             self.total_task_nr = 0
-            self.wanted_copies = copies
             if (not max_service_replicas) or (max_service_replicas >= copies):
                 num_threads = 1
             else:
                 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
             _logger.debug("Pool max threads is %d", num_threads)
             self.workers = []
-            self.queue = KeepClient.KeepWriterQueue(copies)
+            self.queue = KeepClient.KeepWriterQueue(copies, classes)
             # Create workers
             for _ in range(num_threads):
                 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
@@ -665,13 +679,13 @@ class KeepClient(object):
                 except queue.Empty:
                     return
                 try:
-                    locator, copies = self.do_task(service, service_root)
+                    locator, copies, classes = self.do_task(service, service_root)
                 except Exception as e:
                     if not isinstance(e, self.TaskFailed):
                         _logger.exception("Exception in KeepWriterThread")
                     self.queue.write_fail(service)
                 else:
-                    self.queue.write_success(locator, copies)
+                    self.queue.write_success(locator, copies, classes)
                 finally:
                     self.queue.task_done()
 
@@ -699,7 +713,18 @@ class KeepClient(object):
             except (KeyError, ValueError):
                 replicas_stored = 1
 
-            return result['body'].strip(), replicas_stored
+            classes_confirmed = {}
+            try:
+                scch = result['headers']['x-keep-storage-classes-confirmed']
+                for confirmation in scch.replace(' ', '').split(','):
+                    if '=' in confirmation:
+                        stored_class, stored_copies = confirmation.split('=')[:2]
+                        classes_confirmed[stored_class] = int(stored_copies)
+            except (KeyError, ValueError):
+                # Storage classes confirmed header missing or corrupt
+                classes_confirmed = {}
+
+            return result['body'].strip(), replicas_stored, classes_confirmed
 
 
     def __init__(self, api_client=None, proxy=None,
@@ -1124,7 +1149,7 @@ class KeepClient(object):
                 "failed to read {} after {}".format(loc_s, loop.attempts_str()), service_errors, label="service")
 
     @retry.retry_method
-    def put(self, data, copies=2, num_retries=None, request_id=None):
+    def put(self, data, copies=2, num_retries=None, request_id=None, classes=[]):
         """Save data in Keep.
 
         This method will get a list of Keep services from the API server, and
@@ -1141,6 +1166,8 @@ class KeepClient(object):
           *each* Keep server if it returns temporary failures, with
           exponential backoff.  The default value is set when the
           KeepClient is initialized.
+        * classes: An optional list of storage class names where copies should
+          be written.
         """
 
         if not isinstance(data, bytes):
@@ -1160,6 +1187,8 @@ class KeepClient(object):
                              arvados.util.new_request_id()),
             'X-Keep-Desired-Replicas': str(copies),
         }
+        if len(classes) > 0:
+            headers['X-Keep-Storage-Classes'] = ', '.join(classes)
         roots_map = {}
         loop = retry.RetryLoop(num_retries, self._check_loop_result,
                                backoff_start=2)
@@ -1179,7 +1208,8 @@ class KeepClient(object):
                                                         data_hash=data_hash,
                                                         copies=copies - done,
                                                         max_service_replicas=self.max_replicas_per_service,
-                                                        timeout=self.current_timeout(num_retries - tries_left))
+                                                        timeout=self.current_timeout(num_retries - tries_left),
+                                                        classes=classes)
             for service_root, ks in [(root, roots_map[root])
                                      for root in sorted_roots]:
                 if ks.finished():

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list