[ARVADOS] created: 1.2.0-187-g4c669f3d7

Git user git at public.curoverse.com
Tue Oct 30 08:20:33 EDT 2018


        at  4c669f3d78a990863c5c053f98f790e027da5a96 (commit)


commit 4c669f3d78a990863c5c053f98f790e027da5a96
Author: Lucas Di Pentima <ldipentima at veritasgenetics.com>
Date:   Tue Oct 30 09:19:49 2018 -0300

    14259: Collection class copies remote blocks when saving.
    
    Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <ldipentima at veritasgenetics.com>

diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index e38a6bd47..55797bdfe 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -1024,6 +1024,34 @@ class RichCollectionBase(CollectionBase):
                 return self._manifest_text
 
     @synchronized
+    def _copy_remote_blocks(self, remote_blocks={}):
+        """Scan through the entire collection and ask Keep to copy remote blocks.
+
+        When accessing a remote collection, blocks will have a remote signature
+        (+R instead of +A). Collect these signatures and request Keep to copy the
+        blocks to the local cluster, returning local (+A) signatures.
+
+        :remote_blocks:
+          Shared cache of remote to local block mappings. This is used to avoid
+          doing extra work when blocks are shared by more than one file in
+          different subdirectories.
+
+        """
+        for filename in [f for f in self.keys() if isinstance(self[f], ArvadosFile)]:
+            for s in self[filename].segments():
+                if '+R' in s.locator:
+                    try:
+                        loc = remote_blocks[s.locator]
+                    except KeyError:
+                        loc = self._my_keep().refresh_signature(s.locator)
+                        remote_blocks[s.locator] = loc
+                    s.locator = loc
+                    self.set_committed(False)
+        for dirname in [d for d in self.keys() if isinstance(self[d], RichCollectionBase)]:
+            remote_blocks = self[dirname]._copy_remote_blocks(remote_blocks)
+        return remote_blocks
+
+    @synchronized
     def diff(self, end_collection, prefix=".", holding_collection=None):
         """Generate list of add/modify/delete actions.
 
@@ -1376,6 +1404,10 @@ class Collection(RichCollectionBase):
     def _has_collection_uuid(self):
         return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
 
+    def _has_local_collection_uuid(self):
+        return self._has_collection_uuid and \
+            self._my_api()._rootDesc['uuidPrefix'] == self._manifest_locator.split('-')[0]
+
     def __enter__(self):
         return self
 
@@ -1504,9 +1536,14 @@ class Collection(RichCollectionBase):
             t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
             body["trash_at"] = t
 
+        # Copy any remote blocks to the local cluster.
+        self._copy_remote_blocks(remote_blocks={})
+
         if not self.committed():
             if not self._has_collection_uuid():
                 raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
+            elif not self._has_local_collection_uuid():
+                raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.")
 
             self._my_block_manager().commit_all()
 
@@ -1591,6 +1628,9 @@ class Collection(RichCollectionBase):
         if trash_at and type(trash_at) is not datetime.datetime:
             raise errors.ArgumentError("trash_at must be datetime type.")
 
+        # Copy any remote blocks to the local cluster.
+        self._copy_remote_blocks(remote_blocks={})
+
         self._my_block_manager().commit_all()
         text = self.manifest_text(strip=False)
 
diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py
index 722cc5604..491f95d9d 100644
--- a/sdk/python/tests/test_collections.py
+++ b/sdk/python/tests/test_collections.py
@@ -1162,11 +1162,14 @@ class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin):
 class NewCollectionTestCaseWithServersAndTokens(run_test_server.TestCaseWithServers):
     MAIN_SERVER = {}
     KEEP_SERVER = {}
+    local_locator_re = r"[0-9a-f]{32}\+\d+\+A[a-f0-9]{40}@[a-f0-9]{8}"
+    remote_locator_re = r"[0-9a-f]{32}\+\d+\+R[a-z]{5}-[a-f0-9]{40}@[a-f0-9]{8}"
 
     def setUp(self):
         self.keep_put = getattr(arvados.keep.KeepClient, 'put')
 
-    def test_repacked_block_submission_get_permission_token(self):
+    @mock.patch('arvados.keep.KeepClient.put', autospec=True)
+    def test_repacked_block_submission_get_permission_token(self, mocked_put):
         '''
         Make sure that those blocks that are committed after repacking small ones,
         get their permission tokens assigned on the collection manifest.
@@ -1176,19 +1179,58 @@ class NewCollectionTestCaseWithServersAndTokens(run_test_server.TestCaseWithServ
             time.sleep(1)
             return self.keep_put(*args, **kwargs)
 
-        re_locator = "[0-9a-f]{32}\+\d+\+A[a-f0-9]{40}@[a-f0-9]{8}"
-
-        with mock.patch('arvados.keep.KeepClient.put', autospec=True) as mocked_put:
-            mocked_put.side_effect = wrapped_keep_put
-            c = Collection()
-            # Write 70 files ~1MiB each so we force to produce 1 big block by repacking
-            # small ones before finishing the upload.
-            for i in range(70):
-                f = c.open("file_{}.txt".format(i), 'wb')
-                f.write(random.choice('abcdefghijklmnopqrstuvwxyz') * (2**20+i))
-                f.close(flush=False)
-            # We should get 2 blocks with their tokens
-            self.assertEqual(len(re.findall(re_locator, c.manifest_text())), 2)
+        mocked_put.side_effect = wrapped_keep_put
+        c = Collection()
+        # Write 70 files ~1MiB each so we force to produce 1 big block by repacking
+        # small ones before finishing the upload.
+        for i in range(70):
+            f = c.open("file_{}.txt".format(i), 'wb')
+            f.write(random.choice('abcdefghijklmnopqrstuvwxyz') * (2**20+i))
+            f.close(flush=False)
+        # We should get 2 blocks with their tokens
+        self.assertEqual(len(re.findall(self.local_locator_re, c.manifest_text())), 2)
+
+    @mock.patch('arvados.keep.KeepClient.refresh_signature')
+    def test_copy_remote_blocks_on_save_new(self, rs_mock):
+        remote_block_loc = "acbd18db4cc2f85cedef654fccc4a4d8+3+Remote-" + "a" * 40 + "@abcdef01"
+        local_block_loc = "acbd18db4cc2f85cedef654fccc4a4d8+3+A" + "b" * 40 + "@abcdef01"
+        rs_mock.return_value = local_block_loc
+        c = Collection(". " + remote_block_loc + " 0:3:foofile.txt\n")
+        self.assertEqual(
+            len(re.findall(self.remote_locator_re, c.manifest_text())), 1)
+        c.save_new()
+        rs_mock.assert_called()
+        self.assertEqual(
+            len(re.findall(self.local_locator_re, c.manifest_text())), 1)
+
+    @mock.patch('arvados.keep.KeepClient.refresh_signature')
+    def test_copy_remote_blocks_on_save(self, rs_mock):
+        remote_block_loc = "acbd18db4cc2f85cedef654fccc4a4d8+3+Remote-" + "a" * 40 + "@abcdef01"
+        local_block_loc = "acbd18db4cc2f85cedef654fccc4a4d8+3+A" + "b" * 40 + "@abcdef01"
+        rs_mock.return_value = local_block_loc
+        # Remote collection
+        remote_c = Collection(". " + remote_block_loc + " 0:3:foofile.txt\n")
+        self.assertEqual(
+            len(re.findall(self.remote_locator_re, remote_c.manifest_text())), 1)
+        # Local collection
+        local_c = Collection()
+        with local_c.open('barfile.txt', 'wb') as f:
+            f.write('bar')
+        local_c.save_new()
+        self.assertEqual(
+            len(re.findall(self.local_locator_re, local_c.manifest_text())), 1)
+        self.assertEqual(
+            len(re.findall(self.remote_locator_re, local_c.manifest_text())), 0)
+        # Copy remote file to local collection
+        local_c.copy('./foofile.txt', './copied/foofile.txt', remote_c)
+        self.assertEqual(
+            len(re.findall(self.remote_locator_re, local_c.manifest_text())), 1)
+        # Save local collection: remote block should be copied
+        local_c.save()
+        self.assertEqual(
+            len(re.findall(self.local_locator_re, local_c.manifest_text())), 2)
+        self.assertEqual(
+            len(re.findall(self.remote_locator_re, local_c.manifest_text())), 0)
 
 
 class NewCollectionTestCaseWithServers(run_test_server.TestCaseWithServers):

commit 303d133e1060e20f40919e347112560e643c5163
Author: Lucas Di Pentima <ldipentima at veritasgenetics.com>
Date:   Thu Oct 25 18:33:53 2018 -0300

    14259: Move & improve test.
    
    Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <ldipentima at veritasgenetics.com>

diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index a6211e7ea..192a21cd6 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -342,6 +342,25 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
                 None)
 
+    def test_refresh_signature(self):
+        blk_digest = '6f5902ac237024bdd0c176cb93063dc4+11'
+        blk_sig = 'da39a3ee5e6b4b0d3255bfef95601890afd80709 at 53bed294'
+        local_loc = blk_digest+'+A'+blk_sig
+        remote_loc = blk_digest+'+R'+blk_sig
+        api_client = self.mock_keep_services(count=1)
+        headers = {'X-Keep-Locator':local_loc}
+        with tutil.mock_keep_responses('', 200, **headers):
+            # Check that the translated locator gets returned
+            keep_client = arvados.KeepClient(api_client=api_client)
+            self.assertEqual(local_loc, keep_client.refresh_signature(remote_loc))
+            # Check that refresh_signature() uses the correct method and headers
+            keep_client._get_or_head = mock.MagicMock()
+            keep_client.refresh_signature(remote_loc)
+            args, kwargs = keep_client._get_or_head.call_args_list[0]
+            self.assertIn(remote_loc, args)
+            self.assertEqual("HEAD", kwargs['method'])
+            self.assertIn('X-Keep-Signature', kwargs['headers'])
+
     # test_*_timeout verify that KeepClient instructs pycurl to use
     # the appropriate connection and read timeouts. They don't care
     # whether pycurl actually exhibits the expected timeout behavior
@@ -935,16 +954,6 @@ class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
         self.assertEqual(True, self.keepClient.head(locator))
 
     @mock.patch('pycurl.Curl')
-    def test_refresh_signature(self, MockCurl):
-        blk_digest = '6f5902ac237024bdd0c176cb93063dc4+11'
-        blk_sig = 'da39a3ee5e6b4b0d3255bfef95601890afd80709 at 53bed294'
-        MockCurl.return_value = tutil.FakeCurl.make(
-            code=200, body='', headers={'X-Keep-Locator':blk_digest+'+A'+blk_sig})
-        self.mock_disks_and_gateways()
-        locator = blk_digest+'+R'+blk_sig
-        self.assertEqual(blk_digest+'+A'+blk_sig, self.keepClient.refresh_signature(locator))
-
-    @mock.patch('pycurl.Curl')
     def test_get_with_gateway_hints_in_order(self, MockCurl):
         gateways = 4
         disks = 3

commit ca544ed56275747db6423e9da0c34545296db3f5
Author: Lucas Di Pentima <ldipentima at veritasgenetics.com>
Date:   Tue Oct 23 12:50:29 2018 -0300

    14259: Add method to keep client to request remote block copy via HEAD request.
    
    Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <ldipentima at veritasgenetics.com>

diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 71e101cf4..9618ee5ce 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -421,6 +421,10 @@ class KeepClient(object):
                 _logger.info("HEAD %s: %s bytes",
                          self._result['status_code'],
                          self._result.get('content-length'))
+                if self._result['headers'].get('x-keep-locator'):
+                    # This is a response to a remote block copy request, return
+                    # the local copy block locator.
+                    return self._result['headers'].get('x-keep-locator')
                 return True
 
             _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
@@ -975,6 +979,11 @@ class KeepClient(object):
         else:
             return None
 
+    def refresh_signature(self, loc):
+        """Ask Keep to get the remote block and return its local signature"""
+        now = datetime.datetime.utcnow().isoformat("T") + 'Z'
+        return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
+
     @retry.retry_method
     def head(self, loc_s, **kwargs):
         return self._get_or_head(loc_s, method="HEAD", **kwargs)
@@ -983,7 +992,7 @@ class KeepClient(object):
     def get(self, loc_s, **kwargs):
         return self._get_or_head(loc_s, method="GET", **kwargs)
 
-    def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None):
+    def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None):
         """Get data from Keep.
 
         This method fetches one or more blocks of data from Keep.  It
@@ -1024,11 +1033,11 @@ class KeepClient(object):
 
             self.misses_counter.add(1)
 
-            headers = {
-                'X-Request-Id': (request_id or
-                                 (hasattr(self, 'api_client') and self.api_client.request_id) or
-                                 arvados.util.new_request_id()),
-            }
+            if headers is None:
+                headers = {}
+            headers['X-Request-Id'] = (request_id or
+                                        (hasattr(self, 'api_client') and self.api_client.request_id) or
+                                        arvados.util.new_request_id())
 
             # If the locator has hints specifying a prefix (indicating a
             # remote keepproxy) or the UUID of a local gateway service,
@@ -1086,7 +1095,7 @@ class KeepClient(object):
             # Always cache the result, then return it if we succeeded.
             if loop.success():
                 if method == "HEAD":
-                    return True
+                    return blob or True
                 else:
                     return blob
         finally:
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index a7b79933b..a6211e7ea 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -935,6 +935,16 @@ class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
         self.assertEqual(True, self.keepClient.head(locator))
 
     @mock.patch('pycurl.Curl')
+    def test_refresh_signature(self, MockCurl):
+        blk_digest = '6f5902ac237024bdd0c176cb93063dc4+11'
+        blk_sig = 'da39a3ee5e6b4b0d3255bfef95601890afd80709 at 53bed294'
+        MockCurl.return_value = tutil.FakeCurl.make(
+            code=200, body='', headers={'X-Keep-Locator':blk_digest+'+A'+blk_sig})
+        self.mock_disks_and_gateways()
+        locator = blk_digest+'+R'+blk_sig
+        self.assertEqual(blk_digest+'+A'+blk_sig, self.keepClient.refresh_signature(locator))
+
+    @mock.patch('pycurl.Curl')
     def test_get_with_gateway_hints_in_order(self, MockCurl):
         gateways = 4
         disks = 3

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list