[ARVADOS] updated: 2.1.0-794-g744bbbafa

Git user git at public.arvados.org
Wed Jun 2 20:09:13 UTC 2021


Summary of changes:
 sdk/python/arvados/arvfile.py        | 19 ++++++++----------
 sdk/python/arvados/collection.py     | 39 ++++++++++++++++++------------------
 sdk/python/tests/test_collections.py | 17 ++++++++++++++--
 3 files changed, 42 insertions(+), 33 deletions(-)

       via  744bbbafa5dcbba814391eaedfa9489c3614b644 (commit)
      from  aeb5185342b751b6bbbf1e17024d6f17417ffaf2 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 744bbbafa5dcbba814391eaedfa9489c3614b644
Author: Lucas Di Pentima <lucas.dipentima at curii.com>
Date:   Wed Jun 2 16:54:39 2021 -0300

    17465: Synchronizes storage classes data between Collection & BlockManager.
    
    Storage classes can be set at Collection instantiation time, and BlockManager
    used to get that data when being instantiated by Collection, but desired
    storage classes can change in the middle of a Collection instance lifetime,
    and new blocks should get written on the correct classes, so the solution is
    to pass a function for BlockManager to be able to query its parent Collection
    instance settings at any time.
    
    Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <lucas.dipentima at curii.com>

diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index 7c6b732d3..e915ff2ac 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -481,7 +481,7 @@ class _BlockManager(object):
     DEFAULT_PUT_THREADS = 2
     DEFAULT_GET_THREADS = 2
 
-    def __init__(self, keep, copies=None, put_threads=None, num_retries=None, storage_classes=[]):
+    def __init__(self, keep, copies=None, put_threads=None, num_retries=None, storage_classes_func=None):
         """keep: KeepClient object to use"""
         self._keep = keep
         self._bufferblocks = collections.OrderedDict()
@@ -491,13 +491,10 @@ class _BlockManager(object):
         self._prefetch_threads = None
         self.lock = threading.Lock()
         self.prefetch_enabled = True
-        if put_threads:
-            self.num_put_threads = put_threads
-        else:
-            self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
+        self.num_put_threads = put_threads or _BlockManager.DEFAULT_PUT_THREADS
         self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
         self.copies = copies
-        self.storage_classes = storage_classes
+        self.storage_classes = storage_classes_func or (lambda: [])
         self._pending_write_size = 0
         self.threads_lock = threading.Lock()
         self.padding_block = None
@@ -556,9 +553,9 @@ class _BlockManager(object):
                     return
 
                 if self.copies is None:
-                    loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes)
+                    loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes())
                 else:
-                    loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes)
+                    loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes())
                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
             except Exception as e:
                 bufferblock.set_state(_BufferBlock.ERROR, e)
@@ -573,7 +570,7 @@ class _BlockManager(object):
 
                 # If we don't limit the Queue size, the upload queue can quickly
                 # grow to take up gigabytes of RAM if the writing process is
-                # generating data more quickly than it can be send to the Keep
+                # generating data more quickly than it can be sent to the Keep
                 # servers.
                 #
                 # With two upload threads and a queue size of 2, this means up to 4
@@ -727,9 +724,9 @@ class _BlockManager(object):
         if sync:
             try:
                 if self.copies is None:
-                    loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes)
+                    loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes())
                 else:
-                    loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes)
+                    loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes())
                 block.set_state(_BufferBlock.COMMITTED, loc)
             except Exception as e:
                 block.set_state(_BufferBlock.ERROR, e)
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index 319046dc0..3910b2243 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -1300,12 +1300,16 @@ class Collection(RichCollectionBase):
           storage class.
 
         """
+
+        if storage_classes_desired and type(storage_classes_desired) is not list:
+            raise errors.ArgumentError("storage_classes_desired must be list type.")
+
         super(Collection, self).__init__(parent)
         self._api_client = api_client
         self._keep_client = keep_client
         self._block_manager = block_manager
         self.replication_desired = replication_desired
-        self.storage_classes_desired = storage_classes_desired
+        self._storage_classes_desired = storage_classes_desired
         self.put_threads = put_threads
 
         if apiconfig:
@@ -1343,6 +1347,9 @@ class Collection(RichCollectionBase):
             except (IOError, errors.SyntaxError) as e:
                 raise errors.ArgumentError("Error processing manifest text: %s", e)
 
+    def storage_classes_desired(self):
+        return self._storage_classes_desired or []
+
     def root_collection(self):
         return self
 
@@ -1418,7 +1425,7 @@ class Collection(RichCollectionBase):
                       self._my_api()._rootDesc.get('defaultCollectionReplication',
                                                    2))
             classes = self.storage_classes_desired or []
-            self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads, num_retries=self.num_retries, storage_classes=classes)
+            self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads, num_retries=self.num_retries, storage_classes_func=self.storage_classes_desired)
         return self._block_manager
 
     def _remember_api_response(self, response):
@@ -1442,8 +1449,8 @@ class Collection(RichCollectionBase):
         # replication_desired and storage_classes_desired from the API server
         if self.replication_desired is None:
             self.replication_desired = self._api_response.get('replication_desired', None)
-        if self.storage_classes_desired is None:
-            self.storage_classes_desired = self._api_response.get('storage_classes_desired', None)
+        if self._storage_classes_desired is None:
+            self._storage_classes_desired = self._api_response.get('storage_classes_desired', None)
 
     def _populate(self):
         if self._manifest_text is None:
@@ -1583,14 +1590,9 @@ class Collection(RichCollectionBase):
         body={}
         if properties:
             body["properties"] = properties
-        desired_classes = storage_classes
-        # Instance level storage_classes takes precedence over argument.
-        if self.storage_classes_desired:
-            if desired_classes and self.storage_classes_desired != desired_classes:
-                _logger.warning("Storage classes already set to {}".format(self.storage_classes_desired))
-            desired_classes = self.storage_classes_desired
-        if desired_classes:
-            body["storage_classes_desired"] = desired_classes
+        if storage_classes:
+            self._storage_classes_desired = storage_classes
+            body["storage_classes_desired"] = self.storage_classes_desired()
         if trash_at:
             t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
             body["trash_at"] = t
@@ -1693,6 +1695,9 @@ class Collection(RichCollectionBase):
             self._copy_remote_blocks(remote_blocks={})
             self._has_remote_blocks = False
 
+        if storage_classes:
+            self._storage_classes_desired = storage_classes
+
         self._my_block_manager().commit_all()
         text = self.manifest_text(strip=False)
 
@@ -1708,14 +1713,8 @@ class Collection(RichCollectionBase):
                 body["owner_uuid"] = owner_uuid
             if properties:
                 body["properties"] = properties
-            desired_classes = storage_classes
-            # Instance level storage_classes takes precedence over argument.
-            if self.storage_classes_desired:
-                if desired_classes and self.storage_classes_desired != desired_classes:
-                    _logger.warning("Storage classes already set to {}".format(self.storage_classes_desired))
-                desired_classes = self.storage_classes_desired
-            if desired_classes:
-                body["storage_classes_desired"] = desired_classes
+            if self.storage_classes_desired():
+                body["storage_classes_desired"] = self.storage_classes_desired()
             if trash_at:
                 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
                 body["trash_at"] = t
diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py
index eaf68ec1f..f821ff952 100644
--- a/sdk/python/tests/test_collections.py
+++ b/sdk/python/tests/test_collections.py
@@ -915,7 +915,20 @@ class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin):
         loc = c1.manifest_locator()
         c2 = Collection(loc)
         self.assertEqual(c1.manifest_text, c2.manifest_text)
-        self.assertEqual(c1.storage_classes_desired, c2.storage_classes_desired)
+        self.assertEqual(c1.storage_classes_desired(), c2.storage_classes_desired())
+
+    def test_storage_classes_change_after_save(self):
+        m = '. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt 0:10:count2.txt\n'
+        c1 = Collection(m, storage_classes_desired=['archival'])
+        c1.save_new()
+        loc = c1.manifest_locator()
+        c2 = Collection(loc)
+        self.assertEqual(['archival'], c2.storage_classes_desired())
+        c2.save(storage_classes=['highIO'])
+        self.assertEqual(['highIO'], c2.storage_classes_desired())
+        c3 = Collection(loc)
+        self.assertEqual(c1.manifest_text, c3.manifest_text)
+        self.assertEqual(['highIO'], c3.storage_classes_desired())
 
     def test_storage_classes_desired_not_loaded_if_provided(self):
         m = '. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt 0:10:count2.txt\n'
@@ -924,7 +937,7 @@ class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin):
         loc = c1.manifest_locator()
         c2 = Collection(loc, storage_classes_desired=['default'])
         self.assertEqual(c1.manifest_text, c2.manifest_text)
-        self.assertNotEqual(c1.storage_classes_desired, c2.storage_classes_desired)
+        self.assertNotEqual(c1.storage_classes_desired(), c2.storage_classes_desired())
 
     def test_init_manifest(self):
         m1 = """. 5348b82a029fd9e971a811ce1f71360b+43 0:43:md5sum.txt

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list