[ARVADOS] created: 1.2.0-460-gea36c4ba2
Git user
git at public.curoverse.com
Mon Dec 3 14:36:19 EST 2018
at ea36c4ba2652d8b6996e2c68535c22a7dc1a3807 (commit)
commit ea36c4ba2652d8b6996e2c68535c22a7dc1a3807
Author: Lucas Di Pentima <ldipentima at veritasgenetics.com>
Date: Mon Dec 3 16:23:04 2018 -0300
14012: Early cached manifest validation by doing HEAD req on first block.
* Remove un-needed block expiration check and adapted test for this change
* Add local_store_head to keep client for testing purposes
* Fix failing test
Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <ldipentima at veritasgenetics.com>
diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index cba00c3c8..a69dee735 100644
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -31,6 +31,7 @@ import traceback
from apiclient import errors as apiclient_errors
from arvados._version import __version__
+from arvados.util import keep_locator_pattern
import arvados.commands._util as arv_cmd
@@ -289,6 +290,9 @@ class ResumeCacheConflict(Exception):
pass
+class ResumeCacheInvalidError(Exception):
+ pass
+
class ArvPutArgumentConflict(Exception):
pass
@@ -387,7 +391,7 @@ class ResumeCache(object):
new_cache = os.fdopen(new_cache_fd, 'r+')
json.dump(data, new_cache)
os.rename(new_cache_name, self.filename)
- except (IOError, OSError, ResumeCacheConflict) as error:
+ except (IOError, OSError, ResumeCacheConflict):
try:
os.unlink(new_cache_name)
except NameError: # mkstemp failed.
@@ -482,8 +486,8 @@ class ArvPutUploadJob(object):
def _build_upload_list(self):
"""
- Scan the requested paths to count file sizes, excluding files & dirs if requested
- and building the upload file list.
+ Scan the requested paths to count file sizes, excluding requested files
+ and dirs and building the upload file list.
"""
# If there aren't special files to be read, reset total bytes count to zero
# to start counting.
@@ -735,12 +739,6 @@ class ArvPutUploadJob(object):
if not file_in_local_collection:
# File not uploaded yet, upload it completely
should_upload = True
- elif file_in_local_collection.permission_expired():
- # Permission token expired, re-upload file. This will change whenever
- # we have a API for refreshing tokens.
- self.logger.warning("Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename))
- should_upload = True
- self._local_collection.remove(filename)
elif cached_file_data['size'] == file_in_local_collection.size():
# File already there, skip it.
self.bytes_skipped += cached_file_data['size']
@@ -850,6 +848,8 @@ class ArvPutUploadJob(object):
self.logger.info("No cache usage requested for this run.")
# No cache file, set empty state
self._state = copy.deepcopy(self.EMPTY_STATE)
+ if not self._cached_manifest_valid():
+ raise ResumeCacheInvalidError()
# Load the previous manifest so we can check if files were modified remotely.
self._local_collection = arvados.collection.Collection(
self._state['manifest'],
@@ -857,6 +857,30 @@ class ArvPutUploadJob(object):
put_threads=self.put_threads,
api_client=self._api_client)
+ def _cached_manifest_valid(self):
+ """
+ Validate first block's signature to check if cached manifest is usable.
+ Cached manifest could be invalid because:
+ * Block signature expiration
+ * Block signatures belonging to a different user
+ """
+ if self._state.get('manifest', None) is None:
+ # No cached manifest yet, all good.
+ return True
+ m = keep_locator_pattern.search(self._state['manifest'])
+ if m is None:
+ # No blocks found, no invalid block signatures.
+ return True
+ loc = self._state['manifest'][m.start():m.end()]
+ kc = arvados.KeepClient(api_client=self._api_client,
+ num_retries=self.num_retries)
+ try:
+ kc.head(loc)
+ except arvados.errors.KeepRequestError:
+ # Something is wrong, cached manifest is not valid.
+ return False
+ return True
+
def collection_file_paths(self, col, path_prefix='.'):
"""Return a list of file paths by recursively go through the entire collection `col`"""
file_paths = []
@@ -1131,6 +1155,13 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
"arv-put: Another process is already uploading this data.",
" Use --no-cache if this is really what you want."]))
sys.exit(1)
+ except ResumeCacheInvalidError:
+ logger.error("\n".join([
+ "arv-put: Cache seems to contain invalid data: it may have expired",
+ " or been created with another arvados user's credentials.",
+ " Use --no-resume to start a new cache file.",
+ " Use --no-cache to skip current cache file usage."]))
+ sys.exit(1)
except CollectionUpdateError as error:
logger.error("\n".join([
"arv-put: %s" % str(error)]))
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 1b6376e9b..4354ced67 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -791,6 +791,7 @@ class KeepClient(object):
if local_store:
self.local_store = local_store
+ self.head = self.local_store_head
self.get = self.local_store_get
self.put = self.local_store_put
else:
@@ -1230,5 +1231,17 @@ class KeepClient(object):
with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
return f.read()
+ def local_store_head(self, loc_s, num_retries=None):
+ """Companion to local_store_put()."""
+ try:
+ locator = KeepLocator(loc_s)
+ except ValueError:
+ raise arvados.errors.NotFoundError(
+ "Invalid data locator: '%s'" % loc_s)
+ if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
+ return True
+ if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
+ return True
+
def is_cached(self, locator):
return self.block_cache.reserve_cache(expect_hash)
diff --git a/sdk/python/tests/test_arv_put.py b/sdk/python/tests/test_arv_put.py
index 93cfdc2a3..76144e8e3 100644
--- a/sdk/python/tests/test_arv_put.py
+++ b/sdk/python/tests/test_arv_put.py
@@ -549,7 +549,7 @@ class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
writer.bytes_expected)
def test_expected_bytes_for_device(self):
- writer = arv_put.ArvPutUploadJob(['/dev/null'])
+ writer = arv_put.ArvPutUploadJob(['/dev/null'], use_cache=False, resume=False)
self.assertIsNone(writer.bytes_expected)
writer = arv_put.ArvPutUploadJob([__file__, '/dev/null'])
self.assertIsNone(writer.bytes_expected)
@@ -974,12 +974,8 @@ class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
(out, err) = p.communicate()
self.assertRegex(
err.decode(),
- r'WARNING: Uploaded file .* access token expired, will re-upload it from scratch')
- self.assertEqual(p.returncode, 0)
- # Confirm that the resulting cache is different from the last run.
- with open(cache_filepath, 'r') as c2:
- new_cache = json.load(c2)
- self.assertNotEqual(cache['manifest'], new_cache['manifest'])
+ r'ERROR: arv-put: Cache seems to contain invalid data.*')
+ self.assertEqual(p.returncode, 1)
def test_put_collection_with_later_update(self):
tmpdir = self.make_tmpdir()
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list