[ARVADOS] updated: 8be35986ae62ce39691c4c6d491601012255b1ba
Git user
git at public.curoverse.com
Fri Dec 9 17:28:59 EST 2016
Summary of changes:
sdk/python/arvados/arvfile.py | 8 ++++
sdk/python/arvados/commands/put.py | 49 +++++++++++++++----------
sdk/python/tests/test_arv_put.py | 75 ++++++++++++++++++++++++++++++++++----
sdk/python/tests/test_arvfile.py | 21 +++++++++++
4 files changed, 125 insertions(+), 28 deletions(-)
via 8be35986ae62ce39691c4c6d491601012255b1ba (commit)
via 1c96d4eb740dc4285d9f30ba0ce73499f7b7d59e (commit)
via a62e41101a75e01f9b0dd7124eef81714443d8a1 (commit)
via a175a62538e4db86296e49e5412be41e026e73b6 (commit)
via 8303a85b6c3fed92710a590fa5653557b73439cf (commit)
via 90de410ff977f3669d33f532adf0cb71ac8ac261 (commit)
from 2225be2cb92613495450528ba24a9ca14f232748 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 8be35986ae62ce39691c4c6d491601012255b1ba
Author: Lucas Di Pentima <lucas at curoverse.com>
Date: Fri Dec 9 19:27:19 2016 -0300
10383: Added 2 tests to check that no resuming take place when using either --no-resume or --no-cache.
diff --git a/sdk/python/tests/test_arv_put.py b/sdk/python/tests/test_arv_put.py
index f42c0fc..0c1d377 100644
--- a/sdk/python/tests/test_arv_put.py
+++ b/sdk/python/tests/test_arv_put.py
@@ -324,6 +324,7 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
replication_desired=1)
with self.assertRaises(SystemExit):
writer.start(save_collection=False)
+ # Confirm that the file was partially uploaded
self.assertGreater(writer.bytes_written, 0)
self.assertLess(writer.bytes_written,
os.path.getsize(self.large_file_name))
@@ -335,6 +336,65 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
os.path.getsize(self.large_file_name))
writer2.destroy_cache()
+ def test_no_resume_when_asked(self):
+ def wrapped_write(*args, **kwargs):
+ data = args[1]
+ # Exit only on last block
+ if len(data) < arvados.config.KEEP_BLOCK_SIZE:
+ raise SystemExit("Simulated error")
+ return self.arvfile_write(*args, **kwargs)
+
+ with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
+ autospec=True) as mocked_write:
+ mocked_write.side_effect = wrapped_write
+ writer = arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1)
+ with self.assertRaises(SystemExit):
+ writer.start(save_collection=False)
+ # Confirm that the file was partially uploaded
+ self.assertGreater(writer.bytes_written, 0)
+ self.assertLess(writer.bytes_written,
+ os.path.getsize(self.large_file_name))
+ # Retry the upload, this time without resume
+ writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1,
+ resume=False)
+ writer2.start(save_collection=False)
+ self.assertEqual(writer2.bytes_skipped, 0)
+ self.assertEqual(writer2.bytes_written,
+ os.path.getsize(self.large_file_name))
+ writer2.destroy_cache()
+
+ def test_no_resume_when_no_cache(self):
+ def wrapped_write(*args, **kwargs):
+ data = args[1]
+ # Exit only on last block
+ if len(data) < arvados.config.KEEP_BLOCK_SIZE:
+ raise SystemExit("Simulated error")
+ return self.arvfile_write(*args, **kwargs)
+
+ with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
+ autospec=True) as mocked_write:
+ mocked_write.side_effect = wrapped_write
+ writer = arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1)
+ with self.assertRaises(SystemExit):
+ writer.start(save_collection=False)
+ # Confirm that the file was partially uploaded
+ self.assertGreater(writer.bytes_written, 0)
+ self.assertLess(writer.bytes_written,
+ os.path.getsize(self.large_file_name))
+ # Retry the upload, this time without cache usage
+ writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1,
+ resume=False,
+ use_cache=False)
+ writer2.start(save_collection=False)
+ self.assertEqual(writer2.bytes_skipped, 0)
+ self.assertEqual(writer2.bytes_written,
+ os.path.getsize(self.large_file_name))
+ writer2.destroy_cache()
+
class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
TEST_SIZE = os.path.getsize(__file__)
commit 1c96d4eb740dc4285d9f30ba0ce73499f7b7d59e
Author: Lucas Di Pentima <lucas at curoverse.com>
Date: Fri Dec 9 19:26:51 2016 -0300
10383: Exception usage correction
diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index 8995ea9..88956cd 100644
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -222,6 +222,8 @@ class CollectionUpdateError(Exception):
class ResumeCacheConflict(Exception):
pass
+class ArvPutArgumentConflict(Exception):
+ pass
class ResumeCache(object):
CACHE_DIR = '.cache/arvados/arv-put'
@@ -345,7 +347,7 @@ class ArvPutUploadJob(object):
self.logger = logging.getLogger('arvados.arv_put')
if not self.use_cache and self.resume:
- raise ArgumentError('resume cannot be True when use_cache is False')
+ raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
# Load cached data if any and if needed
self._setup_state(update_collection)
commit a62e41101a75e01f9b0dd7124eef81714443d8a1
Author: Lucas Di Pentima <lucas at curoverse.com>
Date: Fri Dec 9 18:36:29 2016 -0300
10383: Refactored the file upload decision code so that it first skims through
the entire list, instead of deciding one a file by file basis. This allows to
pre-calculate how many bytes are going to be skipped when resuming, and that
allows for a precise way of showing the upload progress report.
Also updated affected tests so they pass with this new way of counting bytes.
diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index 3e82bdf..8995ea9 100644
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -341,6 +341,7 @@ class ArvPutUploadJob(object):
self._stop_checkpointer = threading.Event()
self._checkpointer = threading.Thread(target=self._update_task)
self._update_task_time = update_time # How many seconds wait between update runs
+ self._files_to_upload = []
self.logger = logging.getLogger('arvados.arv_put')
if not self.use_cache and self.resume:
@@ -371,11 +372,16 @@ class ArvPutUploadJob(object):
dirs.sort()
files.sort()
for f in files:
- self._write_file(os.path.join(root, f),
+ self._check_file(os.path.join(root, f),
os.path.join(root[len(prefixdir):], f))
else:
- self._write_file(os.path.abspath(path),
+ self._check_file(os.path.abspath(path),
self.filename or os.path.basename(path))
+ # Update bytes_written from current local collection and
+ # report initial progress.
+ self._update()
+ # Actual file upload
+ self._upload_files()
finally:
# Stop the thread before doing anything else
self._stop_checkpointer.set()
@@ -387,8 +393,6 @@ class ArvPutUploadJob(object):
self._cache_file.close()
if save_collection:
self.save_collection()
- # Correct the final written bytes count
- self.bytes_written -= self.bytes_skipped
def save_collection(self):
if self.update:
@@ -468,11 +472,11 @@ class ArvPutUploadJob(object):
self._write(sys.stdin, output)
output.close()
- def _write_file(self, source, filename):
+ def _check_file(self, source, filename):
+ """Check if this file needs to be uploaded"""
resume_offset = 0
should_upload = False
new_file_in_cache = False
-
# Record file path for updating the remote collection before exiting
self._file_paths.append(filename)
@@ -505,22 +509,31 @@ class ArvPutUploadJob(object):
# Permission token expired, re-upload file. This will change whenever
# we have a API for refreshing tokens.
should_upload = True
+ self._local_collection.remove(filename)
elif cached_file_data['size'] == file_in_local_collection.size():
# File already there, skip it.
self.bytes_skipped += cached_file_data['size']
elif cached_file_data['size'] > file_in_local_collection.size():
# File partially uploaded, resume!
resume_offset = file_in_local_collection.size()
+ self.bytes_skipped += resume_offset
should_upload = True
else:
# Inconsistent cache, re-upload the file
should_upload = True
+ self._local_collection.remove(filename)
self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
# Local file differs from cached data, re-upload it.
else:
+ if file_in_local_collection:
+ self._local_collection.remove(filename)
should_upload = True
if should_upload:
+ self._files_to_upload.append((source, resume_offset, filename))
+
+ def _upload_files(self):
+ for source, resume_offset, filename in self._files_to_upload:
with open(source, 'r') as source_fd:
with self._state_lock:
self._state['files'][source]['mtime'] = os.path.getmtime(source)
@@ -529,7 +542,6 @@ class ArvPutUploadJob(object):
# Start upload where we left off
output = self._local_collection.open(filename, 'a')
source_fd.seek(resume_offset)
- self.bytes_skipped += resume_offset
else:
# Start from scratch
output = self._local_collection.open(filename, 'w')
@@ -594,9 +606,6 @@ class ArvPutUploadJob(object):
self._state = copy.deepcopy(self.EMPTY_STATE)
# Load the previous manifest so we can check if files were modified remotely.
self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired)
- # Load how many bytes were uploaded on previous run
- with self._collection_lock:
- self.bytes_written = self._collection_size(self._local_collection)
def _lock_file(self, fileobj):
try:
diff --git a/sdk/python/tests/test_arv_put.py b/sdk/python/tests/test_arv_put.py
index fee32a3..f42c0fc 100644
--- a/sdk/python/tests/test_arv_put.py
+++ b/sdk/python/tests/test_arv_put.py
@@ -278,12 +278,12 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
f.flush()
cwriter = arv_put.ArvPutUploadJob([f.name])
cwriter.start(save_collection=False)
- self.assertEqual(3, cwriter.bytes_written)
+ self.assertEqual(3, cwriter.bytes_written - cwriter.bytes_skipped)
# Don't destroy the cache, and start another upload
cwriter_new = arv_put.ArvPutUploadJob([f.name])
cwriter_new.start(save_collection=False)
cwriter_new.destroy_cache()
- self.assertEqual(0, cwriter_new.bytes_written)
+ self.assertEqual(0, cwriter_new.bytes_written - cwriter_new.bytes_skipped)
def make_progress_tester(self):
progression = []
@@ -324,13 +324,14 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
replication_desired=1)
with self.assertRaises(SystemExit):
writer.start(save_collection=False)
- self.assertLess(writer.bytes_written,
- os.path.getsize(self.large_file_name))
+ self.assertGreater(writer.bytes_written, 0)
+ self.assertLess(writer.bytes_written,
+ os.path.getsize(self.large_file_name))
# Retry the upload
writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
replication_desired=1)
writer2.start(save_collection=False)
- self.assertEqual(writer.bytes_written + writer2.bytes_written,
+ self.assertEqual(writer.bytes_written + writer2.bytes_written - writer2.bytes_skipped,
os.path.getsize(self.large_file_name))
writer2.destroy_cache()
commit a175a62538e4db86296e49e5412be41e026e73b6
Author: Lucas Di Pentima <lucas at curoverse.com>
Date: Thu Dec 8 17:56:44 2016 -0300
10383: Check for expired tokens when getting already uploaded files.
diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index 610fd7d..eadb3a9 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -760,6 +760,14 @@ class ArvadosFile(object):
return self.parent.writable()
@synchronized
+ def permission_expired(self, as_of_dt=None):
+ """Returns True if any of the segment's locators is expired"""
+ for r in self._segments:
+ if KeepLocator(r.locator).permission_expired(as_of_dt):
+ return True
+ return False
+
+ @synchronized
def segments(self):
return copy.copy(self._segments)
diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index e5c85a9..3e82bdf 100644
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -501,6 +501,10 @@ class ArvPutUploadJob(object):
if not file_in_local_collection:
# File not uploaded yet, upload it completely
should_upload = True
+ elif file_in_local_collection.permission_expired():
+ # Permission token expired, re-upload file. This will change whenever
+ # we have a API for refreshing tokens.
+ should_upload = True
elif cached_file_data['size'] == file_in_local_collection.size():
# File already there, skip it.
self.bytes_skipped += cached_file_data['size']
diff --git a/sdk/python/tests/test_arvfile.py b/sdk/python/tests/test_arvfile.py
index 6b35626..8f02d51 100644
--- a/sdk/python/tests/test_arvfile.py
+++ b/sdk/python/tests/test_arvfile.py
@@ -1,6 +1,7 @@
#!/usr/bin/env python
import bz2
+import datetime
import gzip
import io
import mock
@@ -570,6 +571,26 @@ class ArvadosFileReadlinesTestCase(ArvadosFileReadTestCase):
def read_for_test(self, reader, byte_count, **kwargs):
return ''.join(reader.readlines(**kwargs))
+
+class ArvadosFileTestCase(unittest.TestCase):
+ def datetime_to_hex(self, dt):
+ return hex(int(time.mktime(dt.timetuple())))[2:]
+
+ def test_permission_expired(self):
+ base_manifest = ". 781e5e245d69b566979b86e28d23f2c7+10+A715fd31f8111894f717eb1003c1b0216799dd9ec@{} 0:10:count.txt\n"
+ now = datetime.datetime.now()
+ a_week_ago = now - datetime.timedelta(days=7)
+ a_month_ago = now - datetime.timedelta(days=30)
+ a_week_from_now = now + datetime.timedelta(days=7)
+ with Collection(base_manifest.format(self.datetime_to_hex(a_week_from_now))) as c:
+ self.assertFalse(c.find('count.txt').permission_expired())
+ with Collection(base_manifest.format(self.datetime_to_hex(a_week_ago))) as c:
+ f = c.find('count.txt')
+ self.assertTrue(f.permission_expired())
+ self.assertTrue(f.permission_expired(a_week_from_now))
+ self.assertFalse(f.permission_expired(a_month_ago))
+
+
class BlockManagerTest(unittest.TestCase):
def test_bufferblock_append(self):
keep = ArvadosFileWriterTestCase.MockKeep({})
commit 8303a85b6c3fed92710a590fa5653557b73439cf
Author: Lucas Di Pentima <lucas at curoverse.com>
Date: Thu Dec 8 13:27:20 2016 -0300
10383: Changed the time between updates from 1 to 20 secs
diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index 8f7a7b7..e5c85a9 100644
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -315,7 +315,7 @@ class ArvPutUploadJob(object):
def __init__(self, paths, resume=True, use_cache=True, reporter=None,
bytes_expected=None, name=None, owner_uuid=None,
ensure_unique_name=False, num_retries=None, replication_desired=None,
- filename=None, update_time=1.0, update_collection=None):
+ filename=None, update_time=20.0, update_collection=None):
self.paths = paths
self.resume = resume
self.use_cache = use_cache
commit 90de410ff977f3669d33f532adf0cb71ac8ac261
Author: Lucas Di Pentima <lucas at curoverse.com>
Date: Thu Dec 8 13:25:06 2016 -0300
10383: Removed deprecated --max-manifest-depth argument from the help message and ignore its value in make_path.
diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index c7de888..8f7a7b7 100644
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -40,13 +40,7 @@ Local file or directory. Default: read from standard input.
_group = upload_opts.add_mutually_exclusive_group()
_group.add_argument('--max-manifest-depth', type=int, metavar='N',
- default=-1, help="""
-Maximum depth of directory tree to represent in the manifest
-structure. A directory structure deeper than this will be represented
-as a single stream in the manifest. If N=0, the manifest will contain
-a single stream. Default: -1 (unlimited), i.e., exactly one manifest
-stream per filesystem directory that contains files.
-""")
+ default=-1, help=argparse.SUPPRESS)
_group.add_argument('--normalize', action='store_true',
help="""
@@ -244,7 +238,7 @@ class ResumeCache(object):
realpaths = sorted(os.path.realpath(path) for path in args.paths)
md5.update('\0'.join(realpaths))
if any(os.path.isdir(path) for path in realpaths):
- md5.update(str(max(args.max_manifest_depth, -1)))
+ md5.update("-1")
elif args.filename:
md5.update(args.filename)
return os.path.join(
diff --git a/sdk/python/tests/test_arv_put.py b/sdk/python/tests/test_arv_put.py
index e4b2724..fee32a3 100644
--- a/sdk/python/tests/test_arv_put.py
+++ b/sdk/python/tests/test_arv_put.py
@@ -31,9 +31,7 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
[],
['/dev/null'],
['/dev/null', '--filename', 'empty'],
- ['/tmp'],
- ['/tmp', '--max-manifest-depth', '0'],
- ['/tmp', '--max-manifest-depth', '1']
+ ['/tmp']
]
def tearDown(self):
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list