[ARVADOS] created: 1.2.0-187-g4c669f3d7
Git user
git at public.curoverse.com
Tue Oct 30 08:20:33 EDT 2018
at 4c669f3d78a990863c5c053f98f790e027da5a96 (commit)
commit 4c669f3d78a990863c5c053f98f790e027da5a96
Author: Lucas Di Pentima <ldipentima at veritasgenetics.com>
Date: Tue Oct 30 09:19:49 2018 -0300
14259: Collection class copies remote blocks when saving.
Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <ldipentima at veritasgenetics.com>
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index e38a6bd47..55797bdfe 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -1024,6 +1024,34 @@ class RichCollectionBase(CollectionBase):
return self._manifest_text
@synchronized
+ def _copy_remote_blocks(self, remote_blocks={}):
+ """Scan through the entire collection and ask Keep to copy remote blocks.
+
+ When accessing a remote collection, blocks will have a remote signature
+ (+R instead of +A). Collect these signatures and request Keep to copy the
+ blocks to the local cluster, returning local (+A) signatures.
+
+ :remote_blocks:
+ Shared cache of remote to local block mappings. This is used to avoid
+ doing extra work when blocks are shared by more than one file in
+ different subdirectories.
+
+ """
+ for filename in [f for f in self.keys() if isinstance(self[f], ArvadosFile)]:
+ for s in self[filename].segments():
+ if '+R' in s.locator:
+ try:
+ loc = remote_blocks[s.locator]
+ except KeyError:
+ loc = self._my_keep().refresh_signature(s.locator)
+ remote_blocks[s.locator] = loc
+ s.locator = loc
+ self.set_committed(False)
+ for dirname in [d for d in self.keys() if isinstance(self[d], RichCollectionBase)]:
+ remote_blocks = self[dirname]._copy_remote_blocks(remote_blocks)
+ return remote_blocks
+
+ @synchronized
def diff(self, end_collection, prefix=".", holding_collection=None):
"""Generate list of add/modify/delete actions.
@@ -1376,6 +1404,10 @@ class Collection(RichCollectionBase):
def _has_collection_uuid(self):
return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
+ def _has_local_collection_uuid(self):
+ return self._has_collection_uuid and \
+ self._my_api()._rootDesc['uuidPrefix'] == self._manifest_locator.split('-')[0]
+
def __enter__(self):
return self
@@ -1504,9 +1536,14 @@ class Collection(RichCollectionBase):
t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
body["trash_at"] = t
+ # Copy any remote blocks to the local cluster.
+ self._copy_remote_blocks(remote_blocks={})
+
if not self.committed():
if not self._has_collection_uuid():
raise AssertionError("Collection manifest_locator is not a collection uuid. Use save_new() for new collections.")
+ elif not self._has_local_collection_uuid():
+ raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.")
self._my_block_manager().commit_all()
@@ -1591,6 +1628,9 @@ class Collection(RichCollectionBase):
if trash_at and type(trash_at) is not datetime.datetime:
raise errors.ArgumentError("trash_at must be datetime type.")
+ # Copy any remote blocks to the local cluster.
+ self._copy_remote_blocks(remote_blocks={})
+
self._my_block_manager().commit_all()
text = self.manifest_text(strip=False)
diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py
index 722cc5604..491f95d9d 100644
--- a/sdk/python/tests/test_collections.py
+++ b/sdk/python/tests/test_collections.py
@@ -1162,11 +1162,14 @@ class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin):
class NewCollectionTestCaseWithServersAndTokens(run_test_server.TestCaseWithServers):
MAIN_SERVER = {}
KEEP_SERVER = {}
+ local_locator_re = r"[0-9a-f]{32}\+\d+\+A[a-f0-9]{40}@[a-f0-9]{8}"
+ remote_locator_re = r"[0-9a-f]{32}\+\d+\+R[a-z]{5}-[a-f0-9]{40}@[a-f0-9]{8}"
def setUp(self):
self.keep_put = getattr(arvados.keep.KeepClient, 'put')
- def test_repacked_block_submission_get_permission_token(self):
+ @mock.patch('arvados.keep.KeepClient.put', autospec=True)
+ def test_repacked_block_submission_get_permission_token(self, mocked_put):
'''
Make sure that those blocks that are committed after repacking small ones,
get their permission tokens assigned on the collection manifest.
@@ -1176,19 +1179,58 @@ class NewCollectionTestCaseWithServersAndTokens(run_test_server.TestCaseWithServ
time.sleep(1)
return self.keep_put(*args, **kwargs)
- re_locator = "[0-9a-f]{32}\+\d+\+A[a-f0-9]{40}@[a-f0-9]{8}"
-
- with mock.patch('arvados.keep.KeepClient.put', autospec=True) as mocked_put:
- mocked_put.side_effect = wrapped_keep_put
- c = Collection()
- # Write 70 files ~1MiB each so we force to produce 1 big block by repacking
- # small ones before finishing the upload.
- for i in range(70):
- f = c.open("file_{}.txt".format(i), 'wb')
- f.write(random.choice('abcdefghijklmnopqrstuvwxyz') * (2**20+i))
- f.close(flush=False)
- # We should get 2 blocks with their tokens
- self.assertEqual(len(re.findall(re_locator, c.manifest_text())), 2)
+ mocked_put.side_effect = wrapped_keep_put
+ c = Collection()
+ # Write 70 files ~1MiB each so we force to produce 1 big block by repacking
+ # small ones before finishing the upload.
+ for i in range(70):
+ f = c.open("file_{}.txt".format(i), 'wb')
+ f.write(random.choice('abcdefghijklmnopqrstuvwxyz') * (2**20+i))
+ f.close(flush=False)
+ # We should get 2 blocks with their tokens
+ self.assertEqual(len(re.findall(self.local_locator_re, c.manifest_text())), 2)
+
+ @mock.patch('arvados.keep.KeepClient.refresh_signature')
+ def test_copy_remote_blocks_on_save_new(self, rs_mock):
+ remote_block_loc = "acbd18db4cc2f85cedef654fccc4a4d8+3+Remote-" + "a" * 40 + "@abcdef01"
+ local_block_loc = "acbd18db4cc2f85cedef654fccc4a4d8+3+A" + "b" * 40 + "@abcdef01"
+ rs_mock.return_value = local_block_loc
+ c = Collection(". " + remote_block_loc + " 0:3:foofile.txt\n")
+ self.assertEqual(
+ len(re.findall(self.remote_locator_re, c.manifest_text())), 1)
+ c.save_new()
+ rs_mock.assert_called()
+ self.assertEqual(
+ len(re.findall(self.local_locator_re, c.manifest_text())), 1)
+
+ @mock.patch('arvados.keep.KeepClient.refresh_signature')
+ def test_copy_remote_blocks_on_save(self, rs_mock):
+ remote_block_loc = "acbd18db4cc2f85cedef654fccc4a4d8+3+Remote-" + "a" * 40 + "@abcdef01"
+ local_block_loc = "acbd18db4cc2f85cedef654fccc4a4d8+3+A" + "b" * 40 + "@abcdef01"
+ rs_mock.return_value = local_block_loc
+ # Remote collection
+ remote_c = Collection(". " + remote_block_loc + " 0:3:foofile.txt\n")
+ self.assertEqual(
+ len(re.findall(self.remote_locator_re, remote_c.manifest_text())), 1)
+ # Local collection
+ local_c = Collection()
+ with local_c.open('barfile.txt', 'wb') as f:
+ f.write('bar')
+ local_c.save_new()
+ self.assertEqual(
+ len(re.findall(self.local_locator_re, local_c.manifest_text())), 1)
+ self.assertEqual(
+ len(re.findall(self.remote_locator_re, local_c.manifest_text())), 0)
+ # Copy remote file to local collection
+ local_c.copy('./foofile.txt', './copied/foofile.txt', remote_c)
+ self.assertEqual(
+ len(re.findall(self.remote_locator_re, local_c.manifest_text())), 1)
+ # Save local collection: remote block should be copied
+ local_c.save()
+ self.assertEqual(
+ len(re.findall(self.local_locator_re, local_c.manifest_text())), 2)
+ self.assertEqual(
+ len(re.findall(self.remote_locator_re, local_c.manifest_text())), 0)
class NewCollectionTestCaseWithServers(run_test_server.TestCaseWithServers):
commit 303d133e1060e20f40919e347112560e643c5163
Author: Lucas Di Pentima <ldipentima at veritasgenetics.com>
Date: Thu Oct 25 18:33:53 2018 -0300
14259: Move & improve test.
Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <ldipentima at veritasgenetics.com>
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index a6211e7ea..192a21cd6 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -342,6 +342,25 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
None)
+ def test_refresh_signature(self):
+ blk_digest = '6f5902ac237024bdd0c176cb93063dc4+11'
+ blk_sig = 'da39a3ee5e6b4b0d3255bfef95601890afd80709 at 53bed294'
+ local_loc = blk_digest+'+A'+blk_sig
+ remote_loc = blk_digest+'+R'+blk_sig
+ api_client = self.mock_keep_services(count=1)
+ headers = {'X-Keep-Locator':local_loc}
+ with tutil.mock_keep_responses('', 200, **headers):
+ # Check that the translated locator gets returned
+ keep_client = arvados.KeepClient(api_client=api_client)
+ self.assertEqual(local_loc, keep_client.refresh_signature(remote_loc))
+ # Check that refresh_signature() uses the correct method and headers
+ keep_client._get_or_head = mock.MagicMock()
+ keep_client.refresh_signature(remote_loc)
+ args, kwargs = keep_client._get_or_head.call_args_list[0]
+ self.assertIn(remote_loc, args)
+ self.assertEqual("HEAD", kwargs['method'])
+ self.assertIn('X-Keep-Signature', kwargs['headers'])
+
# test_*_timeout verify that KeepClient instructs pycurl to use
# the appropriate connection and read timeouts. They don't care
# whether pycurl actually exhibits the expected timeout behavior
@@ -935,16 +954,6 @@ class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
self.assertEqual(True, self.keepClient.head(locator))
@mock.patch('pycurl.Curl')
- def test_refresh_signature(self, MockCurl):
- blk_digest = '6f5902ac237024bdd0c176cb93063dc4+11'
- blk_sig = 'da39a3ee5e6b4b0d3255bfef95601890afd80709 at 53bed294'
- MockCurl.return_value = tutil.FakeCurl.make(
- code=200, body='', headers={'X-Keep-Locator':blk_digest+'+A'+blk_sig})
- self.mock_disks_and_gateways()
- locator = blk_digest+'+R'+blk_sig
- self.assertEqual(blk_digest+'+A'+blk_sig, self.keepClient.refresh_signature(locator))
-
- @mock.patch('pycurl.Curl')
def test_get_with_gateway_hints_in_order(self, MockCurl):
gateways = 4
disks = 3
commit ca544ed56275747db6423e9da0c34545296db3f5
Author: Lucas Di Pentima <ldipentima at veritasgenetics.com>
Date: Tue Oct 23 12:50:29 2018 -0300
14259: Add method to keep client to request remote block copy via HEAD request.
Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <ldipentima at veritasgenetics.com>
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 71e101cf4..9618ee5ce 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -421,6 +421,10 @@ class KeepClient(object):
_logger.info("HEAD %s: %s bytes",
self._result['status_code'],
self._result.get('content-length'))
+ if self._result['headers'].get('x-keep-locator'):
+ # This is a response to a remote block copy request, return
+ # the local copy block locator.
+ return self._result['headers'].get('x-keep-locator')
return True
_logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
@@ -975,6 +979,11 @@ class KeepClient(object):
else:
return None
+ def refresh_signature(self, loc):
+ """Ask Keep to get the remote block and return its local signature"""
+ now = datetime.datetime.utcnow().isoformat("T") + 'Z'
+ return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
+
@retry.retry_method
def head(self, loc_s, **kwargs):
return self._get_or_head(loc_s, method="HEAD", **kwargs)
@@ -983,7 +992,7 @@ class KeepClient(object):
def get(self, loc_s, **kwargs):
return self._get_or_head(loc_s, method="GET", **kwargs)
- def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None):
+ def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None):
"""Get data from Keep.
This method fetches one or more blocks of data from Keep. It
@@ -1024,11 +1033,11 @@ class KeepClient(object):
self.misses_counter.add(1)
- headers = {
- 'X-Request-Id': (request_id or
- (hasattr(self, 'api_client') and self.api_client.request_id) or
- arvados.util.new_request_id()),
- }
+ if headers is None:
+ headers = {}
+ headers['X-Request-Id'] = (request_id or
+ (hasattr(self, 'api_client') and self.api_client.request_id) or
+ arvados.util.new_request_id())
# If the locator has hints specifying a prefix (indicating a
# remote keepproxy) or the UUID of a local gateway service,
@@ -1086,7 +1095,7 @@ class KeepClient(object):
# Always cache the result, then return it if we succeeded.
if loop.success():
if method == "HEAD":
- return True
+ return blob or True
else:
return blob
finally:
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index a7b79933b..a6211e7ea 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -935,6 +935,16 @@ class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
self.assertEqual(True, self.keepClient.head(locator))
@mock.patch('pycurl.Curl')
+ def test_refresh_signature(self, MockCurl):
+ blk_digest = '6f5902ac237024bdd0c176cb93063dc4+11'
+ blk_sig = 'da39a3ee5e6b4b0d3255bfef95601890afd80709 at 53bed294'
+ MockCurl.return_value = tutil.FakeCurl.make(
+ code=200, body='', headers={'X-Keep-Locator':blk_digest+'+A'+blk_sig})
+ self.mock_disks_and_gateways()
+ locator = blk_digest+'+R'+blk_sig
+ self.assertEqual(blk_digest+'+A'+blk_sig, self.keepClient.refresh_signature(locator))
+
+ @mock.patch('pycurl.Curl')
def test_get_with_gateway_hints_in_order(self, MockCurl):
gateways = 4
disks = 3
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list