[ARVADOS] created: 391a1d8378b4bc6b17b71904b3d6494160b51627
Git user
git at public.curoverse.com
Fri Nov 4 17:21:12 EDT 2016
at 391a1d8378b4bc6b17b71904b3d6494160b51627 (commit)
commit 391a1d8378b4bc6b17b71904b3d6494160b51627
Author: Lucas Di Pentima <lucas at curoverse.com>
Date: Fri Nov 4 18:10:53 2016 -0300
10383: Added --update-collection parameter to arv-put so that it uploads new files into an existing collection.
diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index 34cef67..e428eaa 100644
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -7,21 +7,22 @@ import argparse
import arvados
import arvados.collection
import base64
+import copy
import datetime
import errno
import fcntl
import hashlib
import json
+import logging
import os
import pwd
-import time
+import re
import signal
import socket
import sys
import tempfile
import threading
-import copy
-import logging
+import time
from apiclient import errors as apiclient_errors
import arvados.commands._util as arv_cmd
@@ -96,6 +97,12 @@ separated by commas, with a trailing newline. Do not store a
manifest.
""")
+_group.add_argument('--update-collection', type=str, default=None,
+ dest='update_collection', help="""
+Update an existing collection identified by the given Arvados collection
+UUID or manifest block locator. All new local files will be uploaded.
+""")
+
upload_opts.add_argument('--use-filename', type=str, default=None,
dest='filename', help="""
Synonym for --filename.
@@ -188,12 +195,21 @@ def parse_arguments(arguments):
args.progress = True
if args.paths == ['-']:
+ if args.update_collection:
+ arg_parser.error("""
+ --update-collection cannot be used when reading from stdin.
+ """)
args.resume = False
if not args.filename:
args.filename = 'stdin'
return args
+
+class CollectionUpdateError(Exception):
+ pass
+
+
class ResumeCacheConflict(Exception):
pass
@@ -290,9 +306,10 @@ class ArvPutUploadJob(object):
def __init__(self, paths, resume=True, reporter=None, bytes_expected=None,
name=None, owner_uuid=None, ensure_unique_name=False,
num_retries=None, replication_desired=None,
- filename=None, update_time=1.0):
+ filename=None, update_time=1.0, update_collection=None):
self.paths = paths
self.resume = resume
+ self.update_collection = False
self.reporter = reporter
self.bytes_expected = bytes_expected
self.bytes_written = 0
@@ -313,8 +330,21 @@ class ArvPutUploadJob(object):
self._checkpointer = threading.Thread(target=self._update_task)
self._update_task_time = update_time # How many seconds wait between update runs
self.logger = logging.getLogger('arvados.arv_put')
+ # Load an already existing collection for update
+ if update_collection:
+ if re.match(arvados.util.keep_locator_pattern, update_collection) or re.match(arvados.util.collection_uuid_pattern, update_collection):
+ try:
+ self._collection = arvados.collection.Collection(update_collection)
+ except arvados.errors.ApiError as error:
+ raise CollectionUpdateError("Cannot update collection {} ({})".format(update_collection, error))
+ else:
+ self.update_collection = True
+ self.resume = True
+ else:
+ # Collection locator provided, but unknown format
+ raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
# Load cached data if any and if needed
- self._setup_state()
+ self._setup_state(update_collection)
def start(self):
"""
@@ -345,10 +375,13 @@ class ArvPutUploadJob(object):
def save_collection(self):
with self._collection_lock:
- self._my_collection().save_new(
- name=self.name, owner_uuid=self.owner_uuid,
- ensure_unique_name=self.ensure_unique_name,
- num_retries=self.num_retries)
+ if self.update_collection:
+ self._my_collection().save(num_retries = self.num_retries)
+ else:
+ self._my_collection().save_new(
+ name=self.name, owner_uuid=self.owner_uuid,
+ ensure_unique_name=self.ensure_unique_name,
+ num_retries=self.num_retries)
def destroy_cache(self):
if self.resume:
@@ -433,18 +466,23 @@ class ArvPutUploadJob(object):
except IOError:
# Not found
file_in_collection = None
- # If no previous cached data on this file, store it for an eventual
- # repeated run.
- if source not in self._state['files']:
- with self._state_lock:
+ with self._state_lock:
+ # If no previous cached data on this file, store it for an eventual
+ # repeated run.
+ if source not in self._state['files']:
self._state['files'][source] = {
'mtime': os.path.getmtime(source),
'size' : os.path.getsize(source)
}
- with self._state_lock:
cached_file_data = self._state['files'][source]
# See if this file was already uploaded at least partially
if file_in_collection:
+ # If file already exist and we are updating the collection, ignore it
+ # even if it's different from the local one.
+ if self.update_collection:
+ self.bytes_skipped += file_in_collection.size()
+ return
+
if cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
if cached_file_data['size'] == file_in_collection.size():
# File already there, skip it.
@@ -502,13 +540,15 @@ class ArvPutUploadJob(object):
replication_desired=self.replication_desired)
return self._collection
- def _setup_state(self):
+ def _setup_state(self, update_collection=None):
"""
Create a new cache file or load a previously existing one.
"""
if self.resume:
md5 = hashlib.md5()
md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
+ if update_collection:
+ md5.update(update_collection)
realpaths = sorted(os.path.realpath(path) for path in self.paths)
md5.update('\0'.join(realpaths))
if self.filename:
@@ -716,19 +756,24 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
replication_desired = args.replication,
name = collection_name,
owner_uuid = project_uuid,
- ensure_unique_name = True)
+ ensure_unique_name = True,
+ update_collection = args.update_collection)
except ResumeCacheConflict:
print >>stderr, "\n".join([
"arv-put: Another process is already uploading this data.",
" Use --no-resume if this is really what you want."])
sys.exit(1)
+ except CollectionUpdateError as error:
+ print >>stderr, "\n".join([
+ "arv-put: %s" % str(error)])
+ sys.exit(1)
# Install our signal handler for each code in CAUGHT_SIGNALS, and save
# the originals.
orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
for sigcode in CAUGHT_SIGNALS}
- if args.resume and writer.bytes_written > 0:
+ if not args.update_collection and args.resume and writer.bytes_written > 0:
print >>stderr, "\n".join([
"arv-put: Resuming previous upload from last checkpoint.",
" Use the --no-resume option to start over."])
@@ -749,7 +794,10 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
else:
try:
writer.save_collection()
- print >>stderr, "Collection saved as '%s'" % writer.collection_name()
+ if args.update_collection:
+ print >>stderr, "Collection updated: '{}'".format(writer.collection_name())
+ else:
+ print >>stderr, "Collection saved as '{}'".format(writer.collection_name())
if args.portable_data_hash:
output = writer.portable_data_hash()
else:
diff --git a/sdk/python/tests/test_arv_put.py b/sdk/python/tests/test_arv_put.py
index 7a0120c..898349c 100644
--- a/sdk/python/tests/test_arv_put.py
+++ b/sdk/python/tests/test_arv_put.py
@@ -240,6 +240,7 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
ArvadosBaseTestCase):
+
def setUp(self):
super(ArvPutUploadJobTest, self).setUp()
run_test_server.authorize_with('active')
@@ -624,6 +625,21 @@ class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
self.assertEqual(1, len(collection_list))
return collection_list[0]
+ def test_put_collection_with_later_update(self):
+ tmpdir = self.make_tmpdir()
+ with open(os.path.join(tmpdir, 'file1'), 'w') as f:
+ f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
+ col = self.run_and_find_collection("", ['--no-progress', tmpdir])
+ self.assertNotEqual(None, col['uuid'])
+ # Add a new file to the directory
+ with open(os.path.join(tmpdir, 'file2'), 'w') as f:
+ f.write('The quick brown fox jumped over the lazy dog')
+ updated_col = self.run_and_find_collection("", ['--no-progress', '--update-collection', col['uuid'], tmpdir])
+ self.assertEqual(col['uuid'], updated_col['uuid'])
+ # Get the manifest and check that the new file is being included
+ c = arv_put.api_client.collections().get(uuid=updated_col['uuid']).execute()
+ self.assertRegexpMatches(c['manifest_text'], r'^\. .*:44:file2\n')
+
def test_put_collection_with_high_redundancy(self):
# Write empty data: we're not testing CollectionWriter, just
# making sure collections.create tells the API server what our
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list