[ARVADOS] updated: 0d2d039368f65490e9479bd7500a1f8bdf30849c

Git user git at public.curoverse.com
Mon Nov 21 17:26:58 EST 2016


Summary of changes:
 sdk/python/arvados/collection.py   |   2 +-
 sdk/python/arvados/commands/put.py | 368 +++++++++++++++++--------------------
 2 files changed, 169 insertions(+), 201 deletions(-)

       via  0d2d039368f65490e9479bd7500a1f8bdf30849c (commit)
       via  fd7db907627aac75dae62430b6f2fa948719a3af (commit)
       via  53d8d85b53e445ffb8c16eed86145adee2ff9e37 (commit)
       via  261b9d42a367704e4815f3d59565467b98853787 (commit)
       via  5413abe433a089f11f7ea595073a6a20ffa371de (commit)
       via  bfabb2a8fd5fc3914035a7e2be6bd29d14fc7850 (commit)
       via  ec5ec8c10756217482930b0ad763ff174b7843fd (commit)
       via  17b7582b6116a4891a27769ee06f9a604bbabdd7 (commit)
      from  a3d45597b1f4ea9443d35370ebfa7925fdd5ce90 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 0d2d039368f65490e9479bd7500a1f8bdf30849c
Author: Lucas Di Pentima <lucas at curoverse.com>
Date:   Mon Nov 21 19:23:20 2016 -0300

    10383: Several fixes/enhancements to arv-put:
    * Added --no-cache to avoid creating/loading a cache file.
    * Removed unnecessary code when writing empty ArvadosFiles.
    * Use absolute path on cache index so that is not relative to the CWD and can be used from anywhere on the system.

diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index b88fdc6..c7de888 100644
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -170,6 +170,16 @@ _group.add_argument('--no-resume', action='store_false', dest='resume',
 Do not continue interrupted uploads from cached state.
 """)
 
+_group = run_opts.add_mutually_exclusive_group()
+_group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
+                    help="""
+Save upload state in a cache file for resuming (default).
+""")
+_group.add_argument('--no-cache', action='store_false', dest='use_cache',
+                    help="""
+Do not save upload state in a cache file for resuming.
+""")
+
 arg_parser = argparse.ArgumentParser(
     description='Copy data from the local filesystem to Keep.',
     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
@@ -194,12 +204,17 @@ def parse_arguments(arguments):
         and os.isatty(sys.stderr.fileno())):
         args.progress = True
 
+    # Turn off --resume (default) if --no-cache is used.
+    if not args.use_cache:
+        args.resume = False
+
     if args.paths == ['-']:
         if args.update_collection:
             arg_parser.error("""
     --update-collection cannot be used when reading from stdin.
     """)
         args.resume = False
+        args.use_cache = False
         if not args.filename:
             args.filename = 'stdin'
 
@@ -303,12 +318,13 @@ class ArvPutUploadJob(object):
         'files' : {} # Previous run file list: {path : {size, mtime}}
     }
 
-    def __init__(self, paths, resume=True, reporter=None, bytes_expected=None,
-                 name=None, owner_uuid=None, ensure_unique_name=False,
-                 num_retries=None, replication_desired=None,
+    def __init__(self, paths, resume=True, use_cache=True, reporter=None,
+                 bytes_expected=None, name=None, owner_uuid=None,
+                 ensure_unique_name=False, num_retries=None, replication_desired=None,
                  filename=None, update_time=1.0, update_collection=None):
         self.paths = paths
         self.resume = resume
+        self.use_cache = use_cache
         self.update = False
         self.reporter = reporter
         self.bytes_expected = bytes_expected
@@ -333,6 +349,9 @@ class ArvPutUploadJob(object):
         self._update_task_time = update_time  # How many seconds wait between update runs
         self.logger = logging.getLogger('arvados.arv_put')
 
+        if not self.use_cache and self.resume:
+            raise ArgumentError('resume cannot be True when use_cache is False')
+
         # Load cached data if any and if needed
         self._setup_state(update_collection)
 
@@ -348,29 +367,32 @@ class ArvPutUploadJob(object):
                 if path == '-':
                     self._write_stdin(self.filename or 'stdin')
                 elif os.path.isdir(path):
-                    if path == '.' or path == './' or os.path.dirname(path) == '':
-                        dirname = ''
-                    else:
-                        dirname = os.path.dirname(path) + '/'
+                    # Use absolute paths on cache index so CWD doesn't interfere
+                    # with the caching logic.
+                    prefixdir = path = os.path.abspath(path)
+                    if prefixdir != '/':
+                        prefixdir += '/'
                     for root, dirs, files in os.walk(path):
                         # Make os.walk()'s dir traversing order deterministic
                         dirs.sort()
                         files.sort()
                         for f in files:
                             self._write_file(os.path.join(root, f),
-                                             os.path.join(root[len(dirname):], f))
+                                             os.path.join(root[len(prefixdir):], f))
                 else:
-                    self._write_file(path, self.filename or os.path.basename(path))
+                    self._write_file(os.path.abspath(path),
+                                     self.filename or os.path.basename(path))
         finally:
             # Stop the thread before doing anything else
             self._stop_checkpointer.set()
             self._checkpointer.join()
-            # Commit all & one last _update()
-            self.manifest_text()
+            # Commit all pending blocks & one last _update()
+            self._local_collection.manifest_text()
+            self._update(final=True)
+            if self.use_cache:
+                self._cache_file.close()
             if save_collection:
                 self.save_collection()
-            self._update()
-            self._cache_file.close()
             # Correct the final written bytes count
             self.bytes_written -= self.bytes_skipped
 
@@ -425,17 +447,21 @@ class ArvPutUploadJob(object):
         while not self._stop_checkpointer.wait(self._update_task_time):
             self._update()
 
-    def _update(self):
+    def _update(self, final=False):
         """
         Update cached manifest text and report progress.
         """
         with self._collection_lock:
             self.bytes_written = self._collection_size(self._local_collection)
-            # Update cache, if resume enabled
-            with self._state_lock:
-                # Get the manifest text without comitting pending blocks
-                self._state['manifest'] = self._local_collection._get_manifest_text(".", strip=False, normalize=False, only_committed=True)
-            self._save_state()
+            if self.use_cache:
+                # Update cache
+                with self._state_lock:
+                    if final:
+                        self._state['manifest'] = self._local_collection.manifest_text()
+                    else:
+                        # Get the manifest text without comitting pending blocks
+                        self._state['manifest'] = self._local_collection._get_manifest_text(".", strip=False, normalize=False, only_committed=True)
+                self._save_state()
         # Call the reporter, if any
         self.report_progress()
 
@@ -513,14 +539,10 @@ class ArvPutUploadJob(object):
                 output.close(flush=False)
 
     def _write(self, source_fd, output):
-        first_read = True
         while True:
             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
-            # Allow an empty file to be written
-            if not data and not first_read:
+            if not data:
                 break
-            if first_read:
-                first_read = False
             output.write(data)
 
     def _my_collection(self):
@@ -543,30 +565,35 @@ class ArvPutUploadJob(object):
             # Collection locator provided, but unknown format
             raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
 
-        # Set up cache file name from input paths.
-        md5 = hashlib.md5()
-        md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
-        realpaths = sorted(os.path.realpath(path) for path in self.paths)
-        md5.update('\0'.join(realpaths))
-        if self.filename:
-            md5.update(self.filename)
-        cache_filename = md5.hexdigest()
-        self._cache_file = open(os.path.join(
-            arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
-            cache_filename), 'a+')
-        self._cache_filename = self._cache_file.name
-        self._lock_file(self._cache_file)
-        self._cache_file.seek(0)
+        if self.use_cache:
+            # Set up cache file name from input paths.
+            md5 = hashlib.md5()
+            md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
+            realpaths = sorted(os.path.realpath(path) for path in self.paths)
+            md5.update('\0'.join(realpaths))
+            if self.filename:
+                md5.update(self.filename)
+            cache_filename = md5.hexdigest()
+            self._cache_file = open(os.path.join(
+                arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
+                cache_filename), 'a+')
+            self._cache_filename = self._cache_file.name
+            self._lock_file(self._cache_file)
+            self._cache_file.seek(0)
+
         with self._state_lock:
-            try:
-                self._state = json.load(self._cache_file)
-                if not set(['manifest', 'files']).issubset(set(self._state.keys())):
-                    # Cache at least partially incomplete, set up new cache
+            if self.use_cache:
+                try:
+                    self._state = json.load(self._cache_file)
+                    if not set(['manifest', 'files']).issubset(set(self._state.keys())):
+                        # Cache at least partially incomplete, set up new cache
+                        self._state = copy.deepcopy(self.EMPTY_STATE)
+                except ValueError:
+                    # Cache file empty, set up new cache
                     self._state = copy.deepcopy(self.EMPTY_STATE)
-            except ValueError:
-                # Cache file empty, set up new cache
+            else:
+                # No cache file, set empty state
                 self._state = copy.deepcopy(self.EMPTY_STATE)
-
             # Load the previous manifest so we can check if files were modified remotely.
             self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired)
         # Load how many bytes were uploaded on previous run
@@ -585,7 +612,7 @@ class ArvPutUploadJob(object):
         """
         try:
             with self._state_lock:
-                state = self._state
+                state = copy.deepcopy(self._state)
             new_cache_fd, new_cache_name = tempfile.mkstemp(
                 dir=os.path.dirname(self._cache_filename))
             self._lock_file(new_cache_fd)
@@ -738,6 +765,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     try:
         writer = ArvPutUploadJob(paths = args.paths,
                                  resume = args.resume,
+                                 use_cache = args.use_cache,
                                  filename = args.filename,
                                  reporter = reporter,
                                  bytes_expected = bytes_expected,
@@ -750,7 +778,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     except ResumeCacheConflict:
         print >>stderr, "\n".join([
             "arv-put: Another process is already uploading this data.",
-            "         Use --no-resume if this is really what you want."])
+            "         Use --no-cache if this is really what you want."])
         sys.exit(1)
     except CollectionUpdateError as error:
         print >>stderr, "\n".join([

commit fd7db907627aac75dae62430b6f2fa948719a3af
Author: Lucas Di Pentima <lucas at curoverse.com>
Date:   Mon Nov 21 19:22:31 2016 -0300

    10383: Fixed an error when using Collection.copy() to overwrite an existing ArvadosFile.

diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index 8613f1f..812438e 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -836,7 +836,7 @@ class RichCollectionBase(CollectionBase):
         if target_dir is None:
             raise IOError(errno.ENOENT, "Target directory not found", target_name)
 
-        if target_name in target_dir and isinstance(self[target_name], RichCollectionBase) and sourcecomponents:
+        if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
             target_dir = target_dir[target_name]
             target_name = sourcecomponents[-1]
 

commit 53d8d85b53e445ffb8c16eed86145adee2ff9e37
Author: Lucas Di Pentima <lucas at curoverse.com>
Date:   Thu Nov 17 18:59:40 2016 -0300

    10383: Unused exception removed.

diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index 16eba35..b88fdc6 100644
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -210,10 +210,6 @@ class CollectionUpdateError(Exception):
     pass
 
 
-class ResumeCacheInvalid(Exception):
-    pass
-
-
 class ResumeCacheConflict(Exception):
     pass
 
@@ -756,12 +752,6 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
             "arv-put: Another process is already uploading this data.",
             "         Use --no-resume if this is really what you want."])
         sys.exit(1)
-    except ResumeCacheInvalid as error:
-        print >>stderr, "\n".join([
-            "arv-put: %s" % str(error),
-            "         Use --no-resume or delete/move the cache file to upload to a new collection.",
-            "         Use --update-collection otherwise."])
-        sys.exit(1)
     except CollectionUpdateError as error:
         print >>stderr, "\n".join([
             "arv-put: %s" % str(error)])

commit 261b9d42a367704e4815f3d59565467b98853787
Author: Lucas Di Pentima <lucas at curoverse.com>
Date:   Thu Nov 17 18:53:54 2016 -0300

    10383: Tidying up internal collection management and naming.

diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index d8eb872..16eba35 100644
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -328,9 +328,9 @@ class ArvPutUploadJob(object):
         self._state = None # Previous run state (file list & manifest)
         self._current_files = [] # Current run file list
         self._cache_file = None
-        self._collection = None
         self._collection_lock = threading.Lock()
-        self._local_collection = None # Previous run collection manifest
+        self._remote_collection = None # Collection being updated (if asked)
+        self._local_collection = None # Collection from previous run manifest
         self._file_paths = [] # Files to be updated in remote collection
         self._stop_checkpointer = threading.Event()
         self._checkpointer = threading.Thread(target=self._update_task)
@@ -382,19 +382,19 @@ class ArvPutUploadJob(object):
         if self.update:
             # Check if files should be updated on the remote collection.
             for fp in self._file_paths:
-                remote_file = self._collection.find(fp)
+                remote_file = self._remote_collection.find(fp)
                 if not remote_file:
                     # File don't exist on remote collection, copy it.
-                    self._collection.copy(fp, fp, self._local_collection)
+                    self._remote_collection.copy(fp, fp, self._local_collection)
                 elif remote_file != self._local_collection.find(fp):
                     # A different file exist on remote collection, overwrite it.
-                    self._collection.copy(fp, fp, self._local_collection, overwrite=True)
+                    self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
                 else:
                     # The file already exist on remote collection, skip it.
                     pass
-            self._collection.save(num_retries=self.num_retries)
+            self._remote_collection.save(num_retries=self.num_retries)
         else:
-            self._my_collection().save_new(
+            self._local_collection.save_new(
                 name=self.name, owner_uuid=self.owner_uuid,
                 ensure_unique_name=self.ensure_unique_name,
                 num_retries=self.num_retries)
@@ -528,7 +528,7 @@ class ArvPutUploadJob(object):
             output.write(data)
 
     def _my_collection(self):
-        return self._local_collection
+        return self._remote_collection if self.update else self._local_collection
 
     def _setup_state(self, update_collection):
         """
@@ -538,7 +538,7 @@ class ArvPutUploadJob(object):
         if update_collection and re.match(arvados.util.collection_uuid_pattern,
                                           update_collection):
             try:
-                self._collection = arvados.collection.Collection(update_collection)
+                self._remote_collection = arvados.collection.Collection(update_collection)
             except arvados.errors.ApiError as error:
                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
             else:
@@ -575,7 +575,7 @@ class ArvPutUploadJob(object):
             self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired)
         # Load how many bytes were uploaded on previous run
         with self._collection_lock:
-            self.bytes_written = self._collection_size(self._my_collection())
+            self.bytes_written = self._collection_size(self._local_collection)
 
     def _lock_file(self, fileobj):
         try:
@@ -609,21 +609,16 @@ class ArvPutUploadJob(object):
             self._cache_file = new_cache
 
     def collection_name(self):
-        with self._collection_lock:
-            name = self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
-        return name
+        return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
 
     def manifest_locator(self):
-        locator = self._my_collection().manifest_locator()
-        return locator
+        return self._my_collection().manifest_locator()
 
     def portable_data_hash(self):
-        datahash = self._my_collection().portable_data_hash()
-        return datahash
+        return self._my_collection().portable_data_hash()
 
     def manifest_text(self, stream_name=".", strip=False, normalize=False):
-        manifest = self._my_collection().manifest_text(stream_name, strip, normalize)
-        return manifest
+        return self._my_collection().manifest_text(stream_name, strip, normalize)
 
     def _datablocks_on_item(self, item):
         """

commit 5413abe433a089f11f7ea595073a6a20ffa371de
Author: Lucas Di Pentima <lucas at curoverse.com>
Date:   Thu Nov 17 18:13:17 2016 -0300

    10383: Now --update-collection is not mutually exclusive with --resume and --no-resume.
    Also moved update collection setup code into the setup method.

diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index d57d0ad..d8eb872 100644
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -97,8 +97,8 @@ separated by commas, with a trailing newline. Do not store a
 manifest.
 """)
 
-_group.add_argument('--update-collection', type=str, default=None,
-                    dest='update_collection', metavar="UUID", help="""
+upload_opts.add_argument('--update-collection', type=str, default=None,
+                         dest='update_collection', metavar="UUID", help="""
 Update an existing collection identified by the given Arvados collection
 UUID. All new local files will be uploaded.
 """)
@@ -337,24 +337,8 @@ class ArvPutUploadJob(object):
         self._update_task_time = update_time  # How many seconds wait between update runs
         self.logger = logging.getLogger('arvados.arv_put')
 
-        # Load an already existing collection for update
-        if update_collection and re.match(arvados.util.collection_uuid_pattern,
-                                          update_collection):
-            try:
-                self._collection = arvados.collection.Collection(update_collection)
-            except arvados.errors.ApiError as error:
-                raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
-            else:
-                self.update = True
-        elif update_collection:
-            # Collection locator provided, but unknown format
-            raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
-        else:
-            # No collection asked for update, set up an empty one.
-            self._collection = arvados.collection.Collection(replication_desired=self.replication_desired)
-
         # Load cached data if any and if needed
-        self._setup_state()
+        self._setup_state(update_collection)
 
     def start(self, save_collection):
         """
@@ -546,10 +530,24 @@ class ArvPutUploadJob(object):
     def _my_collection(self):
         return self._local_collection
 
-    def _setup_state(self):
+    def _setup_state(self, update_collection):
         """
         Create a new cache file or load a previously existing one.
         """
+        # Load an already existing collection for update
+        if update_collection and re.match(arvados.util.collection_uuid_pattern,
+                                          update_collection):
+            try:
+                self._collection = arvados.collection.Collection(update_collection)
+            except arvados.errors.ApiError as error:
+                raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
+            else:
+                self.update = True
+        elif update_collection:
+            # Collection locator provided, but unknown format
+            raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
+
+        # Set up cache file name from input paths.
         md5 = hashlib.md5()
         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
         realpaths = sorted(os.path.realpath(path) for path in self.paths)

commit bfabb2a8fd5fc3914035a7e2be6bd29d14fc7850
Author: Lucas Di Pentima <lucas at curoverse.com>
Date:   Thu Nov 17 17:54:15 2016 -0300

    10383: Removing unnecessary locking, as Collection class is thread safe.

diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index 9596468..d57d0ad 100644
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -395,26 +395,25 @@ class ArvPutUploadJob(object):
             self.bytes_written -= self.bytes_skipped
 
     def save_collection(self):
-        with self._collection_lock:
-            if self.update:
-                # Check if files should be updated on the remote collection.
-                for fp in self._file_paths:
-                    remote_file = self._collection.find(fp)
-                    if not remote_file:
-                        # File don't exist on remote collection, copy it.
-                        self._collection.copy(fp, fp, self._local_collection)
-                    elif remote_file != self._local_collection.find(fp):
-                        # A different file exist on remote collection, overwrite it.
-                        self._collection.copy(fp, fp, self._local_collection, overwrite=True)
-                    else:
-                        # The file already exist on remote collection, skip it.
-                        pass
-                self._collection.save(num_retries=self.num_retries)
-            else:
-                self._my_collection().save_new(
-                    name=self.name, owner_uuid=self.owner_uuid,
-                    ensure_unique_name=self.ensure_unique_name,
-                    num_retries=self.num_retries)
+        if self.update:
+            # Check if files should be updated on the remote collection.
+            for fp in self._file_paths:
+                remote_file = self._collection.find(fp)
+                if not remote_file:
+                    # File don't exist on remote collection, copy it.
+                    self._collection.copy(fp, fp, self._local_collection)
+                elif remote_file != self._local_collection.find(fp):
+                    # A different file exist on remote collection, overwrite it.
+                    self._collection.copy(fp, fp, self._local_collection, overwrite=True)
+                else:
+                    # The file already exist on remote collection, skip it.
+                    pass
+            self._collection.save(num_retries=self.num_retries)
+        else:
+            self._my_collection().save_new(
+                name=self.name, owner_uuid=self.owner_uuid,
+                ensure_unique_name=self.ensure_unique_name,
+                num_retries=self.num_retries)
 
     def destroy_cache(self):
         if self.resume:
@@ -465,8 +464,7 @@ class ArvPutUploadJob(object):
             self.reporter(self.bytes_written, self.bytes_expected)
 
     def _write_stdin(self, filename):
-        with self._collection_lock:
-            output = self._local_collection.open(filename, 'w')
+        output = self._local_collection.open(filename, 'w')
         self._write(sys.stdin, output)
         output.close()
 
@@ -490,8 +488,7 @@ class ArvPutUploadJob(object):
             cached_file_data = self._state['files'][source]
 
         # Check if file was already uploaded (at least partially)
-        with self._collection_lock:
-            file_in_local_collection = self._local_collection.find(filename)
+        file_in_local_collection = self._local_collection.find(filename)
 
         # If not resuming, upload the full file.
         if not self.resume:
@@ -526,14 +523,12 @@ class ArvPutUploadJob(object):
                     self._state['files'][source]['size'] = os.path.getsize(source)
                 if resume_offset > 0:
                     # Start upload where we left off
-                    with self._collection_lock:
-                        output = self._local_collection.open(filename, 'a')
+                    output = self._local_collection.open(filename, 'a')
                     source_fd.seek(resume_offset)
                     self.bytes_skipped += resume_offset
                 else:
                     # Start from scratch
-                    with self._collection_lock:
-                        output = self._local_collection.open(filename, 'w')
+                    output = self._local_collection.open(filename, 'w')
                 self._write(source_fd, output)
                 output.close(flush=False)
 
@@ -621,18 +616,15 @@ class ArvPutUploadJob(object):
         return name
 
     def manifest_locator(self):
-        with self._collection_lock:
-            locator = self._my_collection().manifest_locator()
+        locator = self._my_collection().manifest_locator()
         return locator
 
     def portable_data_hash(self):
-        with self._collection_lock:
-            datahash = self._my_collection().portable_data_hash()
+        datahash = self._my_collection().portable_data_hash()
         return datahash
 
     def manifest_text(self, stream_name=".", strip=False, normalize=False):
-        with self._collection_lock:
-            manifest = self._my_collection().manifest_text(stream_name, strip, normalize)
+        manifest = self._my_collection().manifest_text(stream_name, strip, normalize)
         return manifest
 
     def _datablocks_on_item(self, item):

commit ec5ec8c10756217482930b0ad763ff174b7843fd
Author: Lucas Di Pentima <lucas at curoverse.com>
Date:   Thu Nov 17 14:52:59 2016 -0300

    10383: Re-done of the resume and update logic:
    * Eliminated the restriction to update the same collection once uploaded.
    * Greatly simplified and corrected the file resuming decision making.
    * Deterministic path traversing and uploading added.
    * Remote collection updating is done using efficient Collection.copy() method from cached manifest.

diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index 911a502..9596468 100644
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -304,8 +304,7 @@ class ArvPutUploadJob(object):
     CACHE_DIR = '.cache/arvados/arv-put'
     EMPTY_STATE = {
         'manifest' : None, # Last saved manifest checkpoint
-        'files' : {}, # Previous run file list: {path : {size, mtime}}
-        'collection_uuid': None, # Saved collection's UUID
+        'files' : {} # Previous run file list: {path : {size, mtime}}
     }
 
     def __init__(self, paths, resume=True, reporter=None, bytes_expected=None,
@@ -331,7 +330,8 @@ class ArvPutUploadJob(object):
         self._cache_file = None
         self._collection = None
         self._collection_lock = threading.Lock()
-        self._local_collection = None # Previous collection manifest, used on update mode.
+        self._local_collection = None # Previous run collection manifest
+        self._file_paths = [] # Files to be updated in remote collection
         self._stop_checkpointer = threading.Event()
         self._checkpointer = threading.Thread(target=self._update_task)
         self._update_task_time = update_time  # How many seconds wait between update runs
@@ -346,22 +346,15 @@ class ArvPutUploadJob(object):
                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
             else:
                 self.update = True
-                self.resume = True
         elif update_collection:
             # Collection locator provided, but unknown format
             raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
+        else:
+            # No collection asked for update, set up an empty one.
+            self._collection = arvados.collection.Collection(replication_desired=self.replication_desired)
+
         # Load cached data if any and if needed
         self._setup_state()
-        if self.update:
-            # Check if cache data belongs to the collection needed to be updated
-            if self._state['collection_uuid'] is None:
-                raise CollectionUpdateError("An initial upload is needed before being able to update the collection {}.".format(update_collection))
-            elif self._state['collection_uuid'] != update_collection:
-                raise CollectionUpdateError("Cached data doesn't belong to collection {}".format(update_collection))
-        elif self.resume:
-            # Check that cache data doesn't belong to an already created collection
-            if self._state['collection_uuid'] is not None:
-                raise ResumeCacheInvalid("Resume cache file '{}' belongs to existing collection {}".format(self._cache_filename, self._state['collection_uuid']))
 
     def start(self, save_collection):
         """
@@ -380,12 +373,12 @@ class ArvPutUploadJob(object):
                     else:
                         dirname = os.path.dirname(path) + '/'
                     for root, dirs, files in os.walk(path):
-                        # Make os.walk()'s traversing order deterministic
+                        # Make os.walk()'s dir traversing order deterministic
                         dirs.sort()
                         files.sort()
-                        for file in files:
-                            self._write_file(os.path.join(root, file),
-                                             os.path.join(root[len(dirname):], file))
+                        for f in files:
+                            self._write_file(os.path.join(root, f),
+                                             os.path.join(root[len(dirname):], f))
                 else:
                     self._write_file(path, self.filename or os.path.basename(path))
         finally:
@@ -396,18 +389,27 @@ class ArvPutUploadJob(object):
             self.manifest_text()
             if save_collection:
                 self.save_collection()
-                with self._state_lock:
-                    self._state['collection_uuid'] = self._my_collection().manifest_locator()
             self._update()
-            if self.resume:
-                self._cache_file.close()
-                # Correct the final written bytes count
-                self.bytes_written -= self.bytes_skipped
+            self._cache_file.close()
+            # Correct the final written bytes count
+            self.bytes_written -= self.bytes_skipped
 
     def save_collection(self):
         with self._collection_lock:
             if self.update:
-                self._my_collection().save(num_retries = self.num_retries)
+                # Check if files should be updated on the remote collection.
+                for fp in self._file_paths:
+                    remote_file = self._collection.find(fp)
+                    if not remote_file:
+                        # File don't exist on remote collection, copy it.
+                        self._collection.copy(fp, fp, self._local_collection)
+                    elif remote_file != self._local_collection.find(fp):
+                        # A different file exist on remote collection, overwrite it.
+                        self._collection.copy(fp, fp, self._local_collection, overwrite=True)
+                    else:
+                        # The file already exist on remote collection, skip it.
+                        pass
+                self._collection.save(num_retries=self.num_retries)
             else:
                 self._my_collection().save_new(
                     name=self.name, owner_uuid=self.owner_uuid,
@@ -449,13 +451,12 @@ class ArvPutUploadJob(object):
         Update cached manifest text and report progress.
         """
         with self._collection_lock:
-            self.bytes_written = self._collection_size(self._my_collection())
+            self.bytes_written = self._collection_size(self._local_collection)
             # Update cache, if resume enabled
-            if self.resume:
-                with self._state_lock:
-                    # Get the manifest text without comitting pending blocks
-                    self._state['manifest'] = self._my_collection()._get_manifest_text(".", strip=False, normalize=False, only_committed=True)
-                self._save_state()
+            with self._state_lock:
+                # Get the manifest text without comitting pending blocks
+                self._state['manifest'] = self._local_collection._get_manifest_text(".", strip=False, normalize=False, only_committed=True)
+            self._save_state()
         # Call the reporter, if any
         self.report_progress()
 
@@ -465,78 +466,76 @@ class ArvPutUploadJob(object):
 
     def _write_stdin(self, filename):
         with self._collection_lock:
-            output = self._my_collection().open(filename, 'w')
+            output = self._local_collection.open(filename, 'w')
         self._write(sys.stdin, output)
         output.close()
 
     def _write_file(self, source, filename):
         resume_offset = 0
-        should_upload = True
-
-        if self.resume:
-            with self._state_lock:
-                # If no previous cached data on this file, store it for an eventual
-                # repeated run.
-                if source not in self._state['files']:
-                    self._state['files'][source] = {
-                        'mtime': os.path.getmtime(source),
-                        'size' : os.path.getsize(source)
-                    }
-                cached_file_data = self._state['files'][source]
-            # Check if file was already uploaded (at least partially)
-            with self._collection_lock:
-                file_in_collection = self._my_collection().find(filename)
-                if self.update:
-                    file_in_local_collection = self._local_collection.find(filename)
-            # Decide what to do with this file.
-            should_upload = False
-            if not file_in_collection:
+        should_upload = False
+        new_file_in_cache = False
+
+        # Record file path for updating the remote collection before exiting
+        self._file_paths.append(filename)
+
+        with self._state_lock:
+            # If no previous cached data on this file, store it for an eventual
+            # repeated run.
+            if source not in self._state['files']:
+                self._state['files'][source] = {
+                    'mtime': os.path.getmtime(source),
+                    'size' : os.path.getsize(source)
+                }
+                new_file_in_cache = True
+            cached_file_data = self._state['files'][source]
+
+        # Check if file was already uploaded (at least partially)
+        with self._collection_lock:
+            file_in_local_collection = self._local_collection.find(filename)
+
+        # If not resuming, upload the full file.
+        if not self.resume:
+            should_upload = True
+        # New file detected from last run, upload it.
+        elif new_file_in_cache:
+            should_upload = True
+        # Local file didn't change from last run.
+        elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
+            if not file_in_local_collection:
+                # File not uploaded yet, upload it completely
                 should_upload = True
-            elif self.update and file_in_local_collection != file_in_collection:
-                # File remotely modified.
+            elif cached_file_data['size'] == file_in_local_collection.size():
+                # File already there, skip it.
+                self.bytes_skipped += cached_file_data['size']
+            elif cached_file_data['size'] > file_in_local_collection.size():
+                # File partially uploaded, resume!
+                resume_offset = file_in_local_collection.size()
                 should_upload = True
-            # From here, we are certain that the remote file is the same as last uploaded.
-            elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
-                # Local file didn't change from last run.
-                if cached_file_data['size'] == file_in_collection.size():
-                    # File already there, skip it.
-                    self.bytes_skipped += cached_file_data['size']
-                elif cached_file_data['size'] > file_in_collection.size():
-                    # File partially uploaded, resume!
-                    resume_offset = file_in_collection.size()
-                    should_upload = True
-                else:
-                    # Inconsistent cache, re-upload the file
-                    should_upload = True
-                    self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
-            elif cached_file_data['mtime'] < os.path.getmtime(source) and cached_file_data['size'] < os.path.getsize(source):
-                # File with appended data since last run
-                    resume_offset = file_in_collection.size()
-                    should_upload = True
             else:
-                # Local file differs from cached data, re-upload it
+                # Inconsistent cache, re-upload the file
                 should_upload = True
+                self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
+        # Local file differs from cached data, re-upload it.
+        else:
+            should_upload = True
 
-        if should_upload is False:
-            return
-
-        with open(source, 'r') as source_fd:
-            if self.resume:
+        if should_upload:
+            with open(source, 'r') as source_fd:
                 with self._state_lock:
                     self._state['files'][source]['mtime'] = os.path.getmtime(source)
                     self._state['files'][source]['size'] = os.path.getsize(source)
-            if resume_offset > 0:
-                # Start upload where we left off
-                with self._collection_lock:
-                    output = self._my_collection().open(filename, 'a')
-                source_fd.seek(resume_offset)
-                self.bytes_skipped += resume_offset
-            else:
-                # Start from scratch
-                with self._collection_lock:
-                    output = self._my_collection().open(filename, 'w')
-            self._write(source_fd, output)
-            output.close(flush=False)
+                if resume_offset > 0:
+                    # Start upload where we left off
+                    with self._collection_lock:
+                        output = self._local_collection.open(filename, 'a')
+                    source_fd.seek(resume_offset)
+                    self.bytes_skipped += resume_offset
+                else:
+                    # Start from scratch
+                    with self._collection_lock:
+                        output = self._local_collection.open(filename, 'w')
+                self._write(source_fd, output)
+                output.close(flush=False)
 
     def _write(self, source_fd, output):
         first_read = True
@@ -550,63 +549,41 @@ class ArvPutUploadJob(object):
             output.write(data)
 
     def _my_collection(self):
-        """
-        Create a new collection if none cached. Load it from cache otherwise.
-        """
-        if self._collection is None:
-            with self._state_lock:
-                manifest = self._state['manifest']
-            if self.resume and manifest is not None:
-                # Create collection from saved state
-                self._collection = arvados.collection.Collection(
-                    manifest,
-                    replication_desired=self.replication_desired)
-            else:
-                # Create new collection
-                self._collection = arvados.collection.Collection(
-                    replication_desired=self.replication_desired)
-        return self._collection
+        return self._local_collection
 
     def _setup_state(self):
         """
         Create a new cache file or load a previously existing one.
         """
-        if self.resume:
-            md5 = hashlib.md5()
-            md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
-            realpaths = sorted(os.path.realpath(path) for path in self.paths)
-            md5.update('\0'.join(realpaths))
-            if self.filename:
-                md5.update(self.filename)
-            cache_filename = md5.hexdigest()
-            self._cache_file = open(os.path.join(
-                arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
-                cache_filename), 'a+')
-            self._cache_filename = self._cache_file.name
-            self._lock_file(self._cache_file)
-            self._cache_file.seek(0)
-            with self._state_lock:
-                try:
-                    self._state = json.load(self._cache_file)
-                    if not set(['manifest', 'files', 'collection_uuid']).issubset(set(self._state.keys())):
-                        # Cache at least partially incomplete, set up new cache
-                        self._state = copy.deepcopy(self.EMPTY_STATE)
-                except ValueError:
-                    # Cache file empty, set up new cache
+        md5 = hashlib.md5()
+        md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
+        realpaths = sorted(os.path.realpath(path) for path in self.paths)
+        md5.update('\0'.join(realpaths))
+        if self.filename:
+            md5.update(self.filename)
+        cache_filename = md5.hexdigest()
+        self._cache_file = open(os.path.join(
+            arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
+            cache_filename), 'a+')
+        self._cache_filename = self._cache_file.name
+        self._lock_file(self._cache_file)
+        self._cache_file.seek(0)
+        with self._state_lock:
+            try:
+                self._state = json.load(self._cache_file)
+                if not set(['manifest', 'files']).issubset(set(self._state.keys())):
+                    # Cache at least partially incomplete, set up new cache
                     self._state = copy.deepcopy(self.EMPTY_STATE)
-
-                # In update mode, load the previous manifest so we can check if files
-                # were modified remotely.
-                if self.update:
-                    self._local_collection = arvados.collection.Collection(self._state['manifest'])
-            # Load how many bytes were uploaded on previous run
-            with self._collection_lock:
-                self.bytes_written = self._collection_size(self._my_collection())
-        # No resume required
-        else:
-            with self._state_lock:
+            except ValueError:
+                # Cache file empty, set up new cache
                 self._state = copy.deepcopy(self.EMPTY_STATE)
 
+            # Load the previous manifest so we can check if files were modified remotely.
+            self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired)
+        # Load how many bytes were uploaded on previous run
+        with self._collection_lock:
+            self.bytes_written = self._collection_size(self._my_collection())
+
     def _lock_file(self, fileobj):
         try:
             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
@@ -865,7 +842,6 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
         sys.exit(status)
 
     # Success!
-    #writer.destroy_cache()
     return output
 
 

commit 17b7582b6116a4891a27769ee06f9a604bbabdd7
Author: Lucas Di Pentima <lucas at curoverse.com>
Date:   Wed Nov 16 17:08:58 2016 -0300

    10383: Using os.walk() to traverse input directories in an deterministic way.

diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index 36e9f6c..911a502 100644
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -375,7 +375,17 @@ class ArvPutUploadJob(object):
                 if path == '-':
                     self._write_stdin(self.filename or 'stdin')
                 elif os.path.isdir(path):
-                    self._write_directory_tree(path)
+                    if path == '.' or path == './' or os.path.dirname(path) == '':
+                        dirname = ''
+                    else:
+                        dirname = os.path.dirname(path) + '/'
+                    for root, dirs, files in os.walk(path):
+                        # Make os.walk()'s traversing order deterministic
+                        dirs.sort()
+                        files.sort()
+                        for file in files:
+                            self._write_file(os.path.join(root, file),
+                                             os.path.join(root[len(dirname):], file))
                 else:
                     self._write_file(path, self.filename or os.path.basename(path))
         finally:
@@ -453,27 +463,6 @@ class ArvPutUploadJob(object):
         if self.reporter is not None:
             self.reporter(self.bytes_written, self.bytes_expected)
 
-    def _write_directory_tree(self, path, stream_name="."):
-        # TODO: Check what happens when multiple directories are passed as
-        # arguments.
-        # If the code below is uncommented, integration test
-        # test_ArvPutSignedManifest (tests.test_arv_put.ArvPutIntegrationTest)
-        # fails, I suppose it is because the manifest_uuid changes because
-        # of the dir addition to stream_name.
-
-        # if stream_name == '.':
-        #     stream_name = os.path.join('.', os.path.basename(path))
-        for item in os.listdir(path):
-            item_col_path = os.path.join(stream_name, item)
-            if os.path.isdir(os.path.join(path, item)):
-                self._my_collection().find_or_create(item_col_path,
-                                arvados.collection.COLLECTION)
-                self._write_directory_tree(os.path.join(path, item),
-                                           item_col_path)
-            else:
-                self._write_file(os.path.join(path, item),
-                                 item_col_path)
-
     def _write_stdin(self, filename):
         with self._collection_lock:
             output = self._my_collection().open(filename, 'w')

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list