[ARVADOS] created: 391a1d8378b4bc6b17b71904b3d6494160b51627

Git user git at public.curoverse.com
Fri Nov 4 17:21:12 EDT 2016


        at  391a1d8378b4bc6b17b71904b3d6494160b51627 (commit)


commit 391a1d8378b4bc6b17b71904b3d6494160b51627
Author: Lucas Di Pentima <lucas at curoverse.com>
Date:   Fri Nov 4 18:10:53 2016 -0300

    10383: Added --update-collection parameter to arv-put so that it uploads new files into an existing collection.

diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index 34cef67..e428eaa 100644
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -7,21 +7,22 @@ import argparse
 import arvados
 import arvados.collection
 import base64
+import copy
 import datetime
 import errno
 import fcntl
 import hashlib
 import json
+import logging
 import os
 import pwd
-import time
+import re
 import signal
 import socket
 import sys
 import tempfile
 import threading
-import copy
-import logging
+import time
 from apiclient import errors as apiclient_errors
 
 import arvados.commands._util as arv_cmd
@@ -96,6 +97,12 @@ separated by commas, with a trailing newline. Do not store a
 manifest.
 """)
 
+_group.add_argument('--update-collection', type=str, default=None,
+                    dest='update_collection', help="""
+Update an existing collection identified by the given Arvados collection
+UUID or manifest block locator. All new local files will be uploaded.
+""")
+
 upload_opts.add_argument('--use-filename', type=str, default=None,
                          dest='filename', help="""
 Synonym for --filename.
@@ -188,12 +195,21 @@ def parse_arguments(arguments):
         args.progress = True
 
     if args.paths == ['-']:
+        if args.update_collection:
+            arg_parser.error("""
+    --update-collection cannot be used when reading from stdin.
+    """)
         args.resume = False
         if not args.filename:
             args.filename = 'stdin'
 
     return args
 
+
+class CollectionUpdateError(Exception):
+    pass
+
+
 class ResumeCacheConflict(Exception):
     pass
 
@@ -290,9 +306,10 @@ class ArvPutUploadJob(object):
     def __init__(self, paths, resume=True, reporter=None, bytes_expected=None,
                  name=None, owner_uuid=None, ensure_unique_name=False,
                  num_retries=None, replication_desired=None,
-                 filename=None, update_time=1.0):
+                 filename=None, update_time=1.0, update_collection=None):
         self.paths = paths
         self.resume = resume
+        self.update_collection = False
         self.reporter = reporter
         self.bytes_expected = bytes_expected
         self.bytes_written = 0
@@ -313,8 +330,21 @@ class ArvPutUploadJob(object):
         self._checkpointer = threading.Thread(target=self._update_task)
         self._update_task_time = update_time  # How many seconds wait between update runs
         self.logger = logging.getLogger('arvados.arv_put')
+        # Load an already existing collection for update
+        if update_collection:
+            if re.match(arvados.util.keep_locator_pattern, update_collection) or re.match(arvados.util.collection_uuid_pattern, update_collection):
+                try:
+                    self._collection = arvados.collection.Collection(update_collection)
+                except arvados.errors.ApiError as error:
+                    raise CollectionUpdateError("Cannot update collection {} ({})".format(update_collection, error))
+                else:
+                    self.update_collection = True
+                    self.resume = True
+            else:
+                # Collection locator  provided, but unknown format
+                raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
         # Load cached data if any and if needed
-        self._setup_state()
+        self._setup_state(update_collection)
 
     def start(self):
         """
@@ -345,10 +375,13 @@ class ArvPutUploadJob(object):
 
     def save_collection(self):
         with self._collection_lock:
-            self._my_collection().save_new(
-                name=self.name, owner_uuid=self.owner_uuid,
-                ensure_unique_name=self.ensure_unique_name,
-                num_retries=self.num_retries)
+            if self.update_collection:
+                self._my_collection().save(num_retries = self.num_retries)
+            else:
+                self._my_collection().save_new(
+                    name=self.name, owner_uuid=self.owner_uuid,
+                    ensure_unique_name=self.ensure_unique_name,
+                    num_retries=self.num_retries)
 
     def destroy_cache(self):
         if self.resume:
@@ -433,18 +466,23 @@ class ArvPutUploadJob(object):
                 except IOError:
                     # Not found
                     file_in_collection = None
-            # If no previous cached data on this file, store it for an eventual
-            # repeated run.
-            if source not in self._state['files']:
-                with self._state_lock:
+            with self._state_lock:
+                # If no previous cached data on this file, store it for an eventual
+                # repeated run.
+                if source not in self._state['files']:
                     self._state['files'][source] = {
                         'mtime': os.path.getmtime(source),
                         'size' : os.path.getsize(source)
                     }
-            with self._state_lock:
                 cached_file_data = self._state['files'][source]
             # See if this file was already uploaded at least partially
             if file_in_collection:
+                # If file already exist and we are updating the collection, ignore it
+                # even if it's different from the local one.
+                if self.update_collection:
+                    self.bytes_skipped += file_in_collection.size()
+                    return
+
                 if cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
                     if cached_file_data['size'] == file_in_collection.size():
                         # File already there, skip it.
@@ -502,13 +540,15 @@ class ArvPutUploadJob(object):
                     replication_desired=self.replication_desired)
         return self._collection
 
-    def _setup_state(self):
+    def _setup_state(self, update_collection=None):
         """
         Create a new cache file or load a previously existing one.
         """
         if self.resume:
             md5 = hashlib.md5()
             md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
+            if update_collection:
+                md5.update(update_collection)
             realpaths = sorted(os.path.realpath(path) for path in self.paths)
             md5.update('\0'.join(realpaths))
             if self.filename:
@@ -716,19 +756,24 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
                                  replication_desired = args.replication,
                                  name = collection_name,
                                  owner_uuid = project_uuid,
-                                 ensure_unique_name = True)
+                                 ensure_unique_name = True,
+                                 update_collection = args.update_collection)
     except ResumeCacheConflict:
         print >>stderr, "\n".join([
             "arv-put: Another process is already uploading this data.",
             "         Use --no-resume if this is really what you want."])
         sys.exit(1)
+    except CollectionUpdateError as error:
+        print >>stderr, "\n".join([
+            "arv-put: %s" % str(error)])
+        sys.exit(1)
 
     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
     # the originals.
     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
                             for sigcode in CAUGHT_SIGNALS}
 
-    if args.resume and writer.bytes_written > 0:
+    if not args.update_collection and args.resume and writer.bytes_written > 0:
         print >>stderr, "\n".join([
                 "arv-put: Resuming previous upload from last checkpoint.",
                 "         Use the --no-resume option to start over."])
@@ -749,7 +794,10 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     else:
         try:
             writer.save_collection()
-            print >>stderr, "Collection saved as '%s'" % writer.collection_name()
+            if args.update_collection:
+                print >>stderr, "Collection updated: '{}'".format(writer.collection_name())
+            else:
+                print >>stderr, "Collection saved as '{}'".format(writer.collection_name())
             if args.portable_data_hash:
                 output = writer.portable_data_hash()
             else:
diff --git a/sdk/python/tests/test_arv_put.py b/sdk/python/tests/test_arv_put.py
index 7a0120c..898349c 100644
--- a/sdk/python/tests/test_arv_put.py
+++ b/sdk/python/tests/test_arv_put.py
@@ -240,6 +240,7 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
 
 class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
                           ArvadosBaseTestCase):
+
     def setUp(self):
         super(ArvPutUploadJobTest, self).setUp()
         run_test_server.authorize_with('active')
@@ -624,6 +625,21 @@ class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
         self.assertEqual(1, len(collection_list))
         return collection_list[0]
 
+    def test_put_collection_with_later_update(self):
+        tmpdir = self.make_tmpdir()
+        with open(os.path.join(tmpdir, 'file1'), 'w') as f:
+            f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
+        col = self.run_and_find_collection("", ['--no-progress', tmpdir])
+        self.assertNotEqual(None, col['uuid'])
+        # Add a new file to the directory
+        with open(os.path.join(tmpdir, 'file2'), 'w') as f:
+            f.write('The quick brown fox jumped over the lazy dog')
+        updated_col = self.run_and_find_collection("", ['--no-progress', '--update-collection', col['uuid'], tmpdir])
+        self.assertEqual(col['uuid'], updated_col['uuid'])
+        # Get the manifest and check that the new file is being included
+        c = arv_put.api_client.collections().get(uuid=updated_col['uuid']).execute()
+        self.assertRegexpMatches(c['manifest_text'], r'^\. .*:44:file2\n')
+
     def test_put_collection_with_high_redundancy(self):
         # Write empty data: we're not testing CollectionWriter, just
         # making sure collections.create tells the API server what our

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list