[ARVADOS] created: 1.2.0-460-gea36c4ba2

Git user git at public.curoverse.com
Mon Dec 3 14:36:19 EST 2018


        at  ea36c4ba2652d8b6996e2c68535c22a7dc1a3807 (commit)


commit ea36c4ba2652d8b6996e2c68535c22a7dc1a3807
Author: Lucas Di Pentima <ldipentima at veritasgenetics.com>
Date:   Mon Dec 3 16:23:04 2018 -0300

    14012: Early cached manifest validation by doing HEAD req on first block.
    
    * Remove un-needed block expiration check and adapted test for this change
    * Add local_store_head to keep client for testing purposes
    * Fix failing test
    
    Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <ldipentima at veritasgenetics.com>

diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index cba00c3c8..a69dee735 100644
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -31,6 +31,7 @@ import traceback
 
 from apiclient import errors as apiclient_errors
 from arvados._version import __version__
+from arvados.util import keep_locator_pattern
 
 import arvados.commands._util as arv_cmd
 
@@ -289,6 +290,9 @@ class ResumeCacheConflict(Exception):
     pass
 
 
+class ResumeCacheInvalidError(Exception):
+    pass
+
 class ArvPutArgumentConflict(Exception):
     pass
 
@@ -387,7 +391,7 @@ class ResumeCache(object):
             new_cache = os.fdopen(new_cache_fd, 'r+')
             json.dump(data, new_cache)
             os.rename(new_cache_name, self.filename)
-        except (IOError, OSError, ResumeCacheConflict) as error:
+        except (IOError, OSError, ResumeCacheConflict):
             try:
                 os.unlink(new_cache_name)
             except NameError:  # mkstemp failed.
@@ -482,8 +486,8 @@ class ArvPutUploadJob(object):
 
     def _build_upload_list(self):
         """
-        Scan the requested paths to count file sizes, excluding files & dirs if requested
-        and building the upload file list.
+        Scan the requested paths to count file sizes, excluding requested files
+        and dirs and building the upload file list.
         """
         # If there aren't special files to be read, reset total bytes count to zero
         # to start counting.
@@ -735,12 +739,6 @@ class ArvPutUploadJob(object):
             if not file_in_local_collection:
                 # File not uploaded yet, upload it completely
                 should_upload = True
-            elif file_in_local_collection.permission_expired():
-                # Permission token expired, re-upload file. This will change whenever
-                # we have a API for refreshing tokens.
-                self.logger.warning("Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename))
-                should_upload = True
-                self._local_collection.remove(filename)
             elif cached_file_data['size'] == file_in_local_collection.size():
                 # File already there, skip it.
                 self.bytes_skipped += cached_file_data['size']
@@ -850,6 +848,8 @@ class ArvPutUploadJob(object):
                 self.logger.info("No cache usage requested for this run.")
                 # No cache file, set empty state
                 self._state = copy.deepcopy(self.EMPTY_STATE)
+            if not self._cached_manifest_valid():
+                raise ResumeCacheInvalidError()
             # Load the previous manifest so we can check if files were modified remotely.
             self._local_collection = arvados.collection.Collection(
                 self._state['manifest'],
@@ -857,6 +857,30 @@ class ArvPutUploadJob(object):
                 put_threads=self.put_threads,
                 api_client=self._api_client)
 
+    def _cached_manifest_valid(self):
+        """
+        Validate first block's signature to check if cached manifest is usable.
+        Cached manifest could be invalid because:
+        * Block signature expiration
+        * Block signatures belonging to a different user
+        """
+        if self._state.get('manifest', None) is None:
+            # No cached manifest yet, all good.
+            return True
+        m = keep_locator_pattern.search(self._state['manifest'])
+        if m is None:
+            # No blocks found, no invalid block signatures.
+            return True
+        loc = self._state['manifest'][m.start():m.end()]
+        kc = arvados.KeepClient(api_client=self._api_client,
+                                num_retries=self.num_retries)
+        try:
+            kc.head(loc)
+        except arvados.errors.KeepRequestError:
+            # Something is wrong, cached manifest is not valid.
+            return False
+        return True
+
     def collection_file_paths(self, col, path_prefix='.'):
         """Return a list of file paths by recursively go through the entire collection `col`"""
         file_paths = []
@@ -1131,6 +1155,13 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
             "arv-put: Another process is already uploading this data.",
             "         Use --no-cache if this is really what you want."]))
         sys.exit(1)
+    except ResumeCacheInvalidError:
+        logger.error("\n".join([
+            "arv-put: Cache seems to contain invalid data: it may have expired",
+            "         or been created with another arvados user's credentials.",
+            "         Use --no-resume to start a new cache file.",
+            "         Use --no-cache to skip current cache file usage."]))
+        sys.exit(1)
     except CollectionUpdateError as error:
         logger.error("\n".join([
             "arv-put: %s" % str(error)]))
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 1b6376e9b..4354ced67 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -791,6 +791,7 @@ class KeepClient(object):
 
         if local_store:
             self.local_store = local_store
+            self.head = self.local_store_head
             self.get = self.local_store_get
             self.put = self.local_store_put
         else:
@@ -1230,5 +1231,17 @@ class KeepClient(object):
         with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
             return f.read()
 
+    def local_store_head(self, loc_s, num_retries=None):
+        """Companion to local_store_put()."""
+        try:
+            locator = KeepLocator(loc_s)
+        except ValueError:
+            raise arvados.errors.NotFoundError(
+                "Invalid data locator: '%s'" % loc_s)
+        if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
+            return True
+        if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
+            return True
+
     def is_cached(self, locator):
         return self.block_cache.reserve_cache(expect_hash)
diff --git a/sdk/python/tests/test_arv_put.py b/sdk/python/tests/test_arv_put.py
index 93cfdc2a3..76144e8e3 100644
--- a/sdk/python/tests/test_arv_put.py
+++ b/sdk/python/tests/test_arv_put.py
@@ -549,7 +549,7 @@ class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
                          writer.bytes_expected)
 
     def test_expected_bytes_for_device(self):
-        writer = arv_put.ArvPutUploadJob(['/dev/null'])
+        writer = arv_put.ArvPutUploadJob(['/dev/null'], use_cache=False, resume=False)
         self.assertIsNone(writer.bytes_expected)
         writer = arv_put.ArvPutUploadJob([__file__, '/dev/null'])
         self.assertIsNone(writer.bytes_expected)
@@ -974,12 +974,8 @@ class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
         (out, err) = p.communicate()
         self.assertRegex(
             err.decode(),
-            r'WARNING: Uploaded file .* access token expired, will re-upload it from scratch')
-        self.assertEqual(p.returncode, 0)
-        # Confirm that the resulting cache is different from the last run.
-        with open(cache_filepath, 'r') as c2:
-            new_cache = json.load(c2)
-        self.assertNotEqual(cache['manifest'], new_cache['manifest'])
+            r'ERROR: arv-put: Cache seems to contain invalid data.*')
+        self.assertEqual(p.returncode, 1)
 
     def test_put_collection_with_later_update(self):
         tmpdir = self.make_tmpdir()

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list