[ARVADOS] updated: 3885bc2e043ce123929acceaaa8dbbf20dbb8c12
Git user
git at public.curoverse.com
Fri Nov 11 21:42:33 EST 2016
Summary of changes:
sdk/python/arvados/collection.py | 11 ++-
sdk/python/arvados/commands/put.py | 165 +++++++++++++++++++++++------------
sdk/python/tests/test_arv_put.py | 14 +--
sdk/python/tests/test_collections.py | 2 +
4 files changed, 127 insertions(+), 65 deletions(-)
via 3885bc2e043ce123929acceaaa8dbbf20dbb8c12 (commit)
via 3dec7045b24acdd53dc054bddcfd4c7f77739f00 (commit)
via 845d6e6521b284a111534447919a6bc594573ee1 (commit)
from 391a1d8378b4bc6b17b71904b3d6494160b51627 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 3885bc2e043ce123929acceaaa8dbbf20dbb8c12
Author: Lucas Di Pentima <lucas at curoverse.com>
Date: Fri Nov 11 21:37:30 2016 -0300
10383: Tests have been adapted to the new behaviour.
diff --git a/sdk/python/tests/test_arv_put.py b/sdk/python/tests/test_arv_put.py
index 898349c..e4b2724 100644
--- a/sdk/python/tests/test_arv_put.py
+++ b/sdk/python/tests/test_arv_put.py
@@ -271,7 +271,7 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
def test_writer_works_without_cache(self):
cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
- cwriter.start()
+ cwriter.start(save_collection=False)
self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
def test_writer_works_with_cache(self):
@@ -279,11 +279,11 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
f.write('foo')
f.flush()
cwriter = arv_put.ArvPutUploadJob([f.name])
- cwriter.start()
+ cwriter.start(save_collection=False)
self.assertEqual(3, cwriter.bytes_written)
# Don't destroy the cache, and start another upload
cwriter_new = arv_put.ArvPutUploadJob([f.name])
- cwriter_new.start()
+ cwriter_new.start(save_collection=False)
cwriter_new.destroy_cache()
self.assertEqual(0, cwriter_new.bytes_written)
@@ -301,13 +301,13 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
progression, reporter = self.make_progress_tester()
cwriter = arv_put.ArvPutUploadJob([f.name],
reporter=reporter, bytes_expected=expect_count)
- cwriter.start()
+ cwriter.start(save_collection=False)
cwriter.destroy_cache()
self.assertIn((3, expect_count), progression)
def test_writer_upload_directory(self):
cwriter = arv_put.ArvPutUploadJob([self.tempdir])
- cwriter.start()
+ cwriter.start(save_collection=False)
cwriter.destroy_cache()
self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
@@ -325,13 +325,13 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
writer = arv_put.ArvPutUploadJob([self.large_file_name],
replication_desired=1)
with self.assertRaises(SystemExit):
- writer.start()
+ writer.start(save_collection=False)
self.assertLess(writer.bytes_written,
os.path.getsize(self.large_file_name))
# Retry the upload
writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
replication_desired=1)
- writer2.start()
+ writer2.start(save_collection=False)
self.assertEqual(writer.bytes_written + writer2.bytes_written,
os.path.getsize(self.large_file_name))
writer2.destroy_cache()
commit 3dec7045b24acdd53dc054bddcfd4c7f77739f00
Author: Lucas Di Pentima <lucas at curoverse.com>
Date: Fri Nov 11 20:38:02 2016 -0300
10383: Enhancements/fixes to the collection update feature:
* The cache file saves the collection UUID when uploading for the first time.
* On initial upload success, the cache file is not deleted, so it can be used for updating the collection.
* Once the initial collection is done, the manifest saved in the cache file is used to detect any remote file change, so the local file is then re-uploaded completely.
* If there weren't remote changes from the last upload/update, new files are uploaded and partial uploads resumed.
* If a new upload (to create a new collection) is attempted without deleting the previous cache file, a message will alert the user and the operation will be cancelled.
diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index e428eaa..36e9f6c 100644
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -98,9 +98,9 @@ manifest.
""")
_group.add_argument('--update-collection', type=str, default=None,
- dest='update_collection', help="""
+ dest='update_collection', metavar="UUID", help="""
Update an existing collection identified by the given Arvados collection
-UUID or manifest block locator. All new local files will be uploaded.
+UUID. All new local files will be uploaded.
""")
upload_opts.add_argument('--use-filename', type=str, default=None,
@@ -210,6 +210,10 @@ class CollectionUpdateError(Exception):
pass
+class ResumeCacheInvalid(Exception):
+ pass
+
+
class ResumeCacheConflict(Exception):
pass
@@ -300,7 +304,8 @@ class ArvPutUploadJob(object):
CACHE_DIR = '.cache/arvados/arv-put'
EMPTY_STATE = {
'manifest' : None, # Last saved manifest checkpoint
- 'files' : {} # Previous run file list: {path : {size, mtime}}
+ 'files' : {}, # Previous run file list: {path : {size, mtime}}
+ 'collection_uuid': None, # Saved collection's UUID
}
def __init__(self, paths, resume=True, reporter=None, bytes_expected=None,
@@ -309,7 +314,7 @@ class ArvPutUploadJob(object):
filename=None, update_time=1.0, update_collection=None):
self.paths = paths
self.resume = resume
- self.update_collection = False
+ self.update = False
self.reporter = reporter
self.bytes_expected = bytes_expected
self.bytes_written = 0
@@ -326,27 +331,39 @@ class ArvPutUploadJob(object):
self._cache_file = None
self._collection = None
self._collection_lock = threading.Lock()
+ self._local_collection = None # Previous collection manifest, used on update mode.
self._stop_checkpointer = threading.Event()
self._checkpointer = threading.Thread(target=self._update_task)
self._update_task_time = update_time # How many seconds wait between update runs
self.logger = logging.getLogger('arvados.arv_put')
+
# Load an already existing collection for update
- if update_collection:
- if re.match(arvados.util.keep_locator_pattern, update_collection) or re.match(arvados.util.collection_uuid_pattern, update_collection):
- try:
- self._collection = arvados.collection.Collection(update_collection)
- except arvados.errors.ApiError as error:
- raise CollectionUpdateError("Cannot update collection {} ({})".format(update_collection, error))
- else:
- self.update_collection = True
- self.resume = True
+ if update_collection and re.match(arvados.util.collection_uuid_pattern,
+ update_collection):
+ try:
+ self._collection = arvados.collection.Collection(update_collection)
+ except arvados.errors.ApiError as error:
+ raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
else:
- # Collection locator provided, but unknown format
- raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
+ self.update = True
+ self.resume = True
+ elif update_collection:
+ # Collection locator provided, but unknown format
+ raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
# Load cached data if any and if needed
- self._setup_state(update_collection)
-
- def start(self):
+ self._setup_state()
+ if self.update:
+ # Check if cache data belongs to the collection needed to be updated
+ if self._state['collection_uuid'] is None:
+ raise CollectionUpdateError("An initial upload is needed before being able to update the collection {}.".format(update_collection))
+ elif self._state['collection_uuid'] != update_collection:
+ raise CollectionUpdateError("Cached data doesn't belong to collection {}".format(update_collection))
+ elif self.resume:
+ # Check that cache data doesn't belong to an already created collection
+ if self._state['collection_uuid'] is not None:
+ raise ResumeCacheInvalid("Resume cache file '{}' belongs to existing collection {}".format(self._cache_filename, self._state['collection_uuid']))
+
+ def start(self, save_collection):
"""
Start supporting thread & file uploading
"""
@@ -367,6 +384,10 @@ class ArvPutUploadJob(object):
self._checkpointer.join()
# Commit all & one last _update()
self.manifest_text()
+ if save_collection:
+ self.save_collection()
+ with self._state_lock:
+ self._state['collection_uuid'] = self._my_collection().manifest_locator()
self._update()
if self.resume:
self._cache_file.close()
@@ -375,7 +396,7 @@ class ArvPutUploadJob(object):
def save_collection(self):
with self._collection_lock:
- if self.update_collection:
+ if self.update:
self._my_collection().save(num_retries = self.num_retries)
else:
self._my_collection().save_new(
@@ -443,12 +464,15 @@ class ArvPutUploadJob(object):
# if stream_name == '.':
# stream_name = os.path.join('.', os.path.basename(path))
for item in os.listdir(path):
+ item_col_path = os.path.join(stream_name, item)
if os.path.isdir(os.path.join(path, item)):
+ self._my_collection().find_or_create(item_col_path,
+ arvados.collection.COLLECTION)
self._write_directory_tree(os.path.join(path, item),
- os.path.join(stream_name, item))
+ item_col_path)
else:
self._write_file(os.path.join(path, item),
- os.path.join(stream_name, item))
+ item_col_path)
def _write_stdin(self, filename):
with self._collection_lock:
@@ -458,14 +482,9 @@ class ArvPutUploadJob(object):
def _write_file(self, source, filename):
resume_offset = 0
+ should_upload = True
+
if self.resume:
- # Check if file was already uploaded (at least partially)
- with self._collection_lock:
- try:
- file_in_collection = self._my_collection().find(filename)
- except IOError:
- # Not found
- file_in_collection = None
with self._state_lock:
# If no previous cached data on this file, store it for an eventual
# repeated run.
@@ -475,29 +494,48 @@ class ArvPutUploadJob(object):
'size' : os.path.getsize(source)
}
cached_file_data = self._state['files'][source]
- # See if this file was already uploaded at least partially
- if file_in_collection:
- # If file already exist and we are updating the collection, ignore it
- # even if it's different from the local one.
- if self.update_collection:
- self.bytes_skipped += file_in_collection.size()
- return
-
- if cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
- if cached_file_data['size'] == file_in_collection.size():
- # File already there, skip it.
- self.bytes_skipped += cached_file_data['size']
- return
- elif cached_file_data['size'] > file_in_collection.size():
- # File partially uploaded, resume!
- resume_offset = file_in_collection.size()
- else:
- # Inconsistent cache, re-upload the file
- self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
+ # Check if file was already uploaded (at least partially)
+ with self._collection_lock:
+ file_in_collection = self._my_collection().find(filename)
+ if self.update:
+ file_in_local_collection = self._local_collection.find(filename)
+ # Decide what to do with this file.
+ should_upload = False
+ if not file_in_collection:
+ should_upload = True
+ elif self.update and file_in_local_collection != file_in_collection:
+ # File remotely modified.
+ should_upload = True
+ # From here, we are certain that the remote file is the same as last uploaded.
+ elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
+ # Local file didn't change from last run.
+ if cached_file_data['size'] == file_in_collection.size():
+ # File already there, skip it.
+ self.bytes_skipped += cached_file_data['size']
+ elif cached_file_data['size'] > file_in_collection.size():
+ # File partially uploaded, resume!
+ resume_offset = file_in_collection.size()
+ should_upload = True
else:
- # Local file differs from cached data, re-upload it
- pass
+ # Inconsistent cache, re-upload the file
+ should_upload = True
+ self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
+ elif cached_file_data['mtime'] < os.path.getmtime(source) and cached_file_data['size'] < os.path.getsize(source):
+ # File with appended data since last run
+ resume_offset = file_in_collection.size()
+ should_upload = True
+ else:
+ # Local file differs from cached data, re-upload it
+ should_upload = True
+
+ if should_upload is False:
+ return
+
with open(source, 'r') as source_fd:
+ if self.resume:
+ with self._state_lock:
+ self._state['files'][source]['mtime'] = os.path.getmtime(source)
+ self._state['files'][source]['size'] = os.path.getsize(source)
if resume_offset > 0:
# Start upload where we left off
with self._collection_lock:
@@ -540,15 +578,13 @@ class ArvPutUploadJob(object):
replication_desired=self.replication_desired)
return self._collection
- def _setup_state(self, update_collection=None):
+ def _setup_state(self):
"""
Create a new cache file or load a previously existing one.
"""
if self.resume:
md5 = hashlib.md5()
md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
- if update_collection:
- md5.update(update_collection)
realpaths = sorted(os.path.realpath(path) for path in self.paths)
md5.update('\0'.join(realpaths))
if self.filename:
@@ -563,12 +599,17 @@ class ArvPutUploadJob(object):
with self._state_lock:
try:
self._state = json.load(self._cache_file)
- if not set(['manifest', 'files']).issubset(set(self._state.keys())):
+ if not set(['manifest', 'files', 'collection_uuid']).issubset(set(self._state.keys())):
# Cache at least partially incomplete, set up new cache
self._state = copy.deepcopy(self.EMPTY_STATE)
except ValueError:
# Cache file empty, set up new cache
self._state = copy.deepcopy(self.EMPTY_STATE)
+
+ # In update mode, load the previous manifest so we can check if files
+ # were modified remotely.
+ if self.update:
+ self._local_collection = arvados.collection.Collection(self._state['manifest'])
# Load how many bytes were uploaded on previous run
with self._collection_lock:
self.bytes_written = self._collection_size(self._my_collection())
@@ -746,6 +787,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
reporter = None
bytes_expected = expected_bytes_for(args.paths)
+
try:
writer = ArvPutUploadJob(paths = args.paths,
resume = args.resume,
@@ -763,6 +805,12 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
"arv-put: Another process is already uploading this data.",
" Use --no-resume if this is really what you want."])
sys.exit(1)
+ except ResumeCacheInvalid as error:
+ print >>stderr, "\n".join([
+ "arv-put: %s" % str(error),
+ " Use --no-resume or delete/move the cache file to upload to a new collection.",
+ " Use --update-collection otherwise."])
+ sys.exit(1)
except CollectionUpdateError as error:
print >>stderr, "\n".join([
"arv-put: %s" % str(error)])
@@ -780,7 +828,13 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
writer.report_progress()
output = None
- writer.start()
+ try:
+ writer.start(save_collection=not(args.stream or args.raw))
+ except arvados.errors.ApiError as error:
+ print >>stderr, "\n".join([
+ "arv-put: %s" % str(error)])
+ sys.exit(1)
+
if args.progress: # Print newline to split stderr from stdout for humans.
print >>stderr
@@ -793,7 +847,6 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
output = ','.join(writer.data_locators())
else:
try:
- writer.save_collection()
if args.update_collection:
print >>stderr, "Collection updated: '{}'".format(writer.collection_name())
else:
@@ -823,7 +876,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
sys.exit(status)
# Success!
- writer.destroy_cache()
+ #writer.destroy_cache()
return output
commit 845d6e6521b284a111534447919a6bc594573ee1
Author: Lucas Di Pentima <lucas at curoverse.com>
Date: Fri Nov 11 20:03:16 2016 -0300
10383: Fixed Collection.find() so that it returns None also when some partial part of the path does not exist. Updated test to cover more cases.
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index 27aad03..8613f1f 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -565,16 +565,23 @@ class RichCollectionBase(CollectionBase):
def find(self, path):
"""Recursively search the specified file path.
- May return either a Collection or ArvadosFile. Return None if not
+ May return either a Collection or ArvadosFile. Return None if not
found.
+ If path is invalid (ex: starts with '/'), an IOError exception will be
+ raised.
"""
if not path:
raise errors.ArgumentError("Parameter 'path' is empty.")
pathcomponents = path.split("/", 1)
+ if pathcomponents[0] == '':
+ raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
+
item = self._items.get(pathcomponents[0])
- if len(pathcomponents) == 1:
+ if item is None:
+ return None
+ elif len(pathcomponents) == 1:
return item
else:
if isinstance(item, RichCollectionBase):
diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py
index fc30a24..0e3d5e1 100644
--- a/sdk/python/tests/test_collections.py
+++ b/sdk/python/tests/test_collections.py
@@ -861,6 +861,8 @@ class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin):
c.find("/.")
with self.assertRaises(arvados.errors.ArgumentError):
c.find("")
+ self.assertIs(c.find("./nonexistant.txt"), None)
+ self.assertIs(c.find("./nonexistantsubdir/nonexistant.txt"), None)
def test_remove_in_subdir(self):
c = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n')
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list