[ARVADOS] updated: 3885bc2e043ce123929acceaaa8dbbf20dbb8c12

Git user git at public.curoverse.com
Fri Nov 11 21:42:33 EST 2016


Summary of changes:
 sdk/python/arvados/collection.py     |  11 ++-
 sdk/python/arvados/commands/put.py   | 165 +++++++++++++++++++++++------------
 sdk/python/tests/test_arv_put.py     |  14 +--
 sdk/python/tests/test_collections.py |   2 +
 4 files changed, 127 insertions(+), 65 deletions(-)

       via  3885bc2e043ce123929acceaaa8dbbf20dbb8c12 (commit)
       via  3dec7045b24acdd53dc054bddcfd4c7f77739f00 (commit)
       via  845d6e6521b284a111534447919a6bc594573ee1 (commit)
      from  391a1d8378b4bc6b17b71904b3d6494160b51627 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 3885bc2e043ce123929acceaaa8dbbf20dbb8c12
Author: Lucas Di Pentima <lucas at curoverse.com>
Date:   Fri Nov 11 21:37:30 2016 -0300

    10383: Tests have been adapted to the new behaviour.

diff --git a/sdk/python/tests/test_arv_put.py b/sdk/python/tests/test_arv_put.py
index 898349c..e4b2724 100644
--- a/sdk/python/tests/test_arv_put.py
+++ b/sdk/python/tests/test_arv_put.py
@@ -271,7 +271,7 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
 
     def test_writer_works_without_cache(self):
         cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
-        cwriter.start()
+        cwriter.start(save_collection=False)
         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
 
     def test_writer_works_with_cache(self):
@@ -279,11 +279,11 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
             f.write('foo')
             f.flush()
             cwriter = arv_put.ArvPutUploadJob([f.name])
-            cwriter.start()
+            cwriter.start(save_collection=False)
             self.assertEqual(3, cwriter.bytes_written)
             # Don't destroy the cache, and start another upload
             cwriter_new = arv_put.ArvPutUploadJob([f.name])
-            cwriter_new.start()
+            cwriter_new.start(save_collection=False)
             cwriter_new.destroy_cache()
             self.assertEqual(0, cwriter_new.bytes_written)
 
@@ -301,13 +301,13 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
                 progression, reporter = self.make_progress_tester()
                 cwriter = arv_put.ArvPutUploadJob([f.name],
                     reporter=reporter, bytes_expected=expect_count)
-                cwriter.start()
+                cwriter.start(save_collection=False)
                 cwriter.destroy_cache()
                 self.assertIn((3, expect_count), progression)
 
     def test_writer_upload_directory(self):
         cwriter = arv_put.ArvPutUploadJob([self.tempdir])
-        cwriter.start()
+        cwriter.start(save_collection=False)
         cwriter.destroy_cache()
         self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
 
@@ -325,13 +325,13 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
             writer = arv_put.ArvPutUploadJob([self.large_file_name],
                                              replication_desired=1)
             with self.assertRaises(SystemExit):
-                writer.start()
+                writer.start(save_collection=False)
                 self.assertLess(writer.bytes_written,
                                 os.path.getsize(self.large_file_name))
         # Retry the upload
         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
                                           replication_desired=1)
-        writer2.start()
+        writer2.start(save_collection=False)
         self.assertEqual(writer.bytes_written + writer2.bytes_written,
                          os.path.getsize(self.large_file_name))
         writer2.destroy_cache()

commit 3dec7045b24acdd53dc054bddcfd4c7f77739f00
Author: Lucas Di Pentima <lucas at curoverse.com>
Date:   Fri Nov 11 20:38:02 2016 -0300

    10383: Enhancements/fixes to the collection update feature:
    * The cache file saves the collection UUID when uploading for the first time.
    * On initial upload success, the cache file is not deleted, so it can be used for updating the collection.
    * Once the initial collection is done, the manifest saved in the cache file is used to detect any remote file change, so the local file is then re-uploaded completely.
    * If there weren't remote changes from the last upload/update, new files are uploaded and partial uploads resumed.
    * If a new upload (to create a new collection) is attempted without deleting the previous cache file, a message will alert the user and the operation will be cancelled.

diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index e428eaa..36e9f6c 100644
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -98,9 +98,9 @@ manifest.
 """)
 
 _group.add_argument('--update-collection', type=str, default=None,
-                    dest='update_collection', help="""
+                    dest='update_collection', metavar="UUID", help="""
 Update an existing collection identified by the given Arvados collection
-UUID or manifest block locator. All new local files will be uploaded.
+UUID. All new local files will be uploaded.
 """)
 
 upload_opts.add_argument('--use-filename', type=str, default=None,
@@ -210,6 +210,10 @@ class CollectionUpdateError(Exception):
     pass
 
 
+class ResumeCacheInvalid(Exception):
+    pass
+
+
 class ResumeCacheConflict(Exception):
     pass
 
@@ -300,7 +304,8 @@ class ArvPutUploadJob(object):
     CACHE_DIR = '.cache/arvados/arv-put'
     EMPTY_STATE = {
         'manifest' : None, # Last saved manifest checkpoint
-        'files' : {} # Previous run file list: {path : {size, mtime}}
+        'files' : {}, # Previous run file list: {path : {size, mtime}}
+        'collection_uuid': None, # Saved collection's UUID
     }
 
     def __init__(self, paths, resume=True, reporter=None, bytes_expected=None,
@@ -309,7 +314,7 @@ class ArvPutUploadJob(object):
                  filename=None, update_time=1.0, update_collection=None):
         self.paths = paths
         self.resume = resume
-        self.update_collection = False
+        self.update = False
         self.reporter = reporter
         self.bytes_expected = bytes_expected
         self.bytes_written = 0
@@ -326,27 +331,39 @@ class ArvPutUploadJob(object):
         self._cache_file = None
         self._collection = None
         self._collection_lock = threading.Lock()
+        self._local_collection = None # Previous collection manifest, used on update mode.
         self._stop_checkpointer = threading.Event()
         self._checkpointer = threading.Thread(target=self._update_task)
         self._update_task_time = update_time  # How many seconds wait between update runs
         self.logger = logging.getLogger('arvados.arv_put')
+
         # Load an already existing collection for update
-        if update_collection:
-            if re.match(arvados.util.keep_locator_pattern, update_collection) or re.match(arvados.util.collection_uuid_pattern, update_collection):
-                try:
-                    self._collection = arvados.collection.Collection(update_collection)
-                except arvados.errors.ApiError as error:
-                    raise CollectionUpdateError("Cannot update collection {} ({})".format(update_collection, error))
-                else:
-                    self.update_collection = True
-                    self.resume = True
+        if update_collection and re.match(arvados.util.collection_uuid_pattern,
+                                          update_collection):
+            try:
+                self._collection = arvados.collection.Collection(update_collection)
+            except arvados.errors.ApiError as error:
+                raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
             else:
-                # Collection locator  provided, but unknown format
-                raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
+                self.update = True
+                self.resume = True
+        elif update_collection:
+            # Collection locator provided, but unknown format
+            raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
         # Load cached data if any and if needed
-        self._setup_state(update_collection)
-
-    def start(self):
+        self._setup_state()
+        if self.update:
+            # Check if cache data belongs to the collection needed to be updated
+            if self._state['collection_uuid'] is None:
+                raise CollectionUpdateError("An initial upload is needed before being able to update the collection {}.".format(update_collection))
+            elif self._state['collection_uuid'] != update_collection:
+                raise CollectionUpdateError("Cached data doesn't belong to collection {}".format(update_collection))
+        elif self.resume:
+            # Check that cache data doesn't belong to an already created collection
+            if self._state['collection_uuid'] is not None:
+                raise ResumeCacheInvalid("Resume cache file '{}' belongs to existing collection {}".format(self._cache_filename, self._state['collection_uuid']))
+
+    def start(self, save_collection):
         """
         Start supporting thread & file uploading
         """
@@ -367,6 +384,10 @@ class ArvPutUploadJob(object):
             self._checkpointer.join()
             # Commit all & one last _update()
             self.manifest_text()
+            if save_collection:
+                self.save_collection()
+                with self._state_lock:
+                    self._state['collection_uuid'] = self._my_collection().manifest_locator()
             self._update()
             if self.resume:
                 self._cache_file.close()
@@ -375,7 +396,7 @@ class ArvPutUploadJob(object):
 
     def save_collection(self):
         with self._collection_lock:
-            if self.update_collection:
+            if self.update:
                 self._my_collection().save(num_retries = self.num_retries)
             else:
                 self._my_collection().save_new(
@@ -443,12 +464,15 @@ class ArvPutUploadJob(object):
         # if stream_name == '.':
         #     stream_name = os.path.join('.', os.path.basename(path))
         for item in os.listdir(path):
+            item_col_path = os.path.join(stream_name, item)
             if os.path.isdir(os.path.join(path, item)):
+                self._my_collection().find_or_create(item_col_path,
+                                arvados.collection.COLLECTION)
                 self._write_directory_tree(os.path.join(path, item),
-                                os.path.join(stream_name, item))
+                                           item_col_path)
             else:
                 self._write_file(os.path.join(path, item),
-                                os.path.join(stream_name, item))
+                                 item_col_path)
 
     def _write_stdin(self, filename):
         with self._collection_lock:
@@ -458,14 +482,9 @@ class ArvPutUploadJob(object):
 
     def _write_file(self, source, filename):
         resume_offset = 0
+        should_upload = True
+
         if self.resume:
-            # Check if file was already uploaded (at least partially)
-            with self._collection_lock:
-                try:
-                    file_in_collection = self._my_collection().find(filename)
-                except IOError:
-                    # Not found
-                    file_in_collection = None
             with self._state_lock:
                 # If no previous cached data on this file, store it for an eventual
                 # repeated run.
@@ -475,29 +494,48 @@ class ArvPutUploadJob(object):
                         'size' : os.path.getsize(source)
                     }
                 cached_file_data = self._state['files'][source]
-            # See if this file was already uploaded at least partially
-            if file_in_collection:
-                # If file already exist and we are updating the collection, ignore it
-                # even if it's different from the local one.
-                if self.update_collection:
-                    self.bytes_skipped += file_in_collection.size()
-                    return
-
-                if cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
-                    if cached_file_data['size'] == file_in_collection.size():
-                        # File already there, skip it.
-                        self.bytes_skipped += cached_file_data['size']
-                        return
-                    elif cached_file_data['size'] > file_in_collection.size():
-                        # File partially uploaded, resume!
-                        resume_offset = file_in_collection.size()
-                    else:
-                        # Inconsistent cache, re-upload the file
-                        self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
+            # Check if file was already uploaded (at least partially)
+            with self._collection_lock:
+                file_in_collection = self._my_collection().find(filename)
+                if self.update:
+                    file_in_local_collection = self._local_collection.find(filename)
+            # Decide what to do with this file.
+            should_upload = False
+            if not file_in_collection:
+                should_upload = True
+            elif self.update and file_in_local_collection != file_in_collection:
+                # File remotely modified.
+                should_upload = True
+            # From here, we are certain that the remote file is the same as last uploaded.
+            elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
+                # Local file didn't change from last run.
+                if cached_file_data['size'] == file_in_collection.size():
+                    # File already there, skip it.
+                    self.bytes_skipped += cached_file_data['size']
+                elif cached_file_data['size'] > file_in_collection.size():
+                    # File partially uploaded, resume!
+                    resume_offset = file_in_collection.size()
+                    should_upload = True
                 else:
-                    # Local file differs from cached data, re-upload it
-                    pass
+                    # Inconsistent cache, re-upload the file
+                    should_upload = True
+                    self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
+            elif cached_file_data['mtime'] < os.path.getmtime(source) and cached_file_data['size'] < os.path.getsize(source):
+                # File with appended data since last run
+                    resume_offset = file_in_collection.size()
+                    should_upload = True
+            else:
+                # Local file differs from cached data, re-upload it
+                should_upload = True
+
+        if should_upload is False:
+            return
+
         with open(source, 'r') as source_fd:
+            if self.resume:
+                with self._state_lock:
+                    self._state['files'][source]['mtime'] = os.path.getmtime(source)
+                    self._state['files'][source]['size'] = os.path.getsize(source)
             if resume_offset > 0:
                 # Start upload where we left off
                 with self._collection_lock:
@@ -540,15 +578,13 @@ class ArvPutUploadJob(object):
                     replication_desired=self.replication_desired)
         return self._collection
 
-    def _setup_state(self, update_collection=None):
+    def _setup_state(self):
         """
         Create a new cache file or load a previously existing one.
         """
         if self.resume:
             md5 = hashlib.md5()
             md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
-            if update_collection:
-                md5.update(update_collection)
             realpaths = sorted(os.path.realpath(path) for path in self.paths)
             md5.update('\0'.join(realpaths))
             if self.filename:
@@ -563,12 +599,17 @@ class ArvPutUploadJob(object):
             with self._state_lock:
                 try:
                     self._state = json.load(self._cache_file)
-                    if not set(['manifest', 'files']).issubset(set(self._state.keys())):
+                    if not set(['manifest', 'files', 'collection_uuid']).issubset(set(self._state.keys())):
                         # Cache at least partially incomplete, set up new cache
                         self._state = copy.deepcopy(self.EMPTY_STATE)
                 except ValueError:
                     # Cache file empty, set up new cache
                     self._state = copy.deepcopy(self.EMPTY_STATE)
+
+                # In update mode, load the previous manifest so we can check if files
+                # were modified remotely.
+                if self.update:
+                    self._local_collection = arvados.collection.Collection(self._state['manifest'])
             # Load how many bytes were uploaded on previous run
             with self._collection_lock:
                 self.bytes_written = self._collection_size(self._my_collection())
@@ -746,6 +787,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
         reporter = None
 
     bytes_expected = expected_bytes_for(args.paths)
+
     try:
         writer = ArvPutUploadJob(paths = args.paths,
                                  resume = args.resume,
@@ -763,6 +805,12 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
             "arv-put: Another process is already uploading this data.",
             "         Use --no-resume if this is really what you want."])
         sys.exit(1)
+    except ResumeCacheInvalid as error:
+        print >>stderr, "\n".join([
+            "arv-put: %s" % str(error),
+            "         Use --no-resume or delete/move the cache file to upload to a new collection.",
+            "         Use --update-collection otherwise."])
+        sys.exit(1)
     except CollectionUpdateError as error:
         print >>stderr, "\n".join([
             "arv-put: %s" % str(error)])
@@ -780,7 +828,13 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
 
     writer.report_progress()
     output = None
-    writer.start()
+    try:
+        writer.start(save_collection=not(args.stream or args.raw))
+    except arvados.errors.ApiError as error:
+        print >>stderr, "\n".join([
+            "arv-put: %s" % str(error)])
+        sys.exit(1)
+
     if args.progress:  # Print newline to split stderr from stdout for humans.
         print >>stderr
 
@@ -793,7 +847,6 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
         output = ','.join(writer.data_locators())
     else:
         try:
-            writer.save_collection()
             if args.update_collection:
                 print >>stderr, "Collection updated: '{}'".format(writer.collection_name())
             else:
@@ -823,7 +876,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
         sys.exit(status)
 
     # Success!
-    writer.destroy_cache()
+    #writer.destroy_cache()
     return output
 
 

commit 845d6e6521b284a111534447919a6bc594573ee1
Author: Lucas Di Pentima <lucas at curoverse.com>
Date:   Fri Nov 11 20:03:16 2016 -0300

    10383: Fixed Collection.find() so that it returns None also when some partial part of the path does not exist. Updated test to cover more cases.

diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index 27aad03..8613f1f 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -565,16 +565,23 @@ class RichCollectionBase(CollectionBase):
     def find(self, path):
         """Recursively search the specified file path.
 
-        May return either a Collection or ArvadosFile.  Return None if not
+        May return either a Collection or ArvadosFile. Return None if not
         found.
+        If path is invalid (ex: starts with '/'), an IOError exception will be
+        raised.
 
         """
         if not path:
             raise errors.ArgumentError("Parameter 'path' is empty.")
 
         pathcomponents = path.split("/", 1)
+        if pathcomponents[0] == '':
+            raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
+
         item = self._items.get(pathcomponents[0])
-        if len(pathcomponents) == 1:
+        if item is None:
+            return None
+        elif len(pathcomponents) == 1:
             return item
         else:
             if isinstance(item, RichCollectionBase):
diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py
index fc30a24..0e3d5e1 100644
--- a/sdk/python/tests/test_collections.py
+++ b/sdk/python/tests/test_collections.py
@@ -861,6 +861,8 @@ class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin):
             c.find("/.")
         with self.assertRaises(arvados.errors.ArgumentError):
             c.find("")
+        self.assertIs(c.find("./nonexistant.txt"), None)
+        self.assertIs(c.find("./nonexistantsubdir/nonexistant.txt"), None)
 
     def test_remove_in_subdir(self):
         c = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n')

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list