[ARVADOS] updated: 3a1f0b616382230c32439b8d7e17ff75a132a10d
Git user
git at public.curoverse.com
Fri Aug 5 14:16:32 EDT 2016
Summary of changes:
sdk/python/arvados/commands/put.py | 18 ++++----
sdk/python/tests/test_arv_put.py | 85 +++++++++++---------------------------
2 files changed, 35 insertions(+), 68 deletions(-)
via 3a1f0b616382230c32439b8d7e17ff75a132a10d (commit)
from e69ce852e1cbbe5bab82e32ec5d1874ef5a768f3 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 3a1f0b616382230c32439b8d7e17ff75a132a10d
Author: Lucas Di Pentima <lucas at curoverse.com>
Date: Fri Aug 5 15:14:04 2016 -0300
9463: On failure, commit the cache file too, before exiting. Wrote missing test about failed partial upload + resuming using mock function instead of injecting code manually.
diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index b2c40f1..43e3813 100644
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -334,12 +334,13 @@ class ArvPutUploadJob(object):
# Stop the thread before doing anything else
self._stop_checkpointer.set()
self._checkpointer.join()
- # Successful upload, one last _update()
- self._update()
- if self.resume:
- self._cache_file.close()
- # Correct the final written bytes count
- self.bytes_written -= self.bytes_skipped
+ # Commit all & one last _update()
+ self.manifest_text()
+ self._update()
+ if self.resume:
+ self._cache_file.close()
+ # Correct the final written bytes count
+ self.bytes_written -= self.bytes_skipped
def save_collection(self):
with self._collection_lock:
@@ -387,7 +388,8 @@ class ArvPutUploadJob(object):
# Update cache, if resume enabled
if self.resume:
with self._state_lock:
- self._state['manifest'] = self._my_collection().manifest_text()
+ # Get the manifest text without comitting pending blocks
+ self._state['manifest'] = self._my_collection()._get_manifest_text(".", strip=False, normalize=False)
if self.resume:
self._save_state()
# Call the reporter, if any
@@ -470,7 +472,7 @@ class ArvPutUploadJob(object):
output = self._my_collection().open(filename, 'w')
self._write(source_fd, output)
output.close()
-
+
def _write(self, source_fd, output):
while True:
data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
diff --git a/sdk/python/tests/test_arv_put.py b/sdk/python/tests/test_arv_put.py
index 7d8d790..52063f2 100644
--- a/sdk/python/tests/test_arv_put.py
+++ b/sdk/python/tests/test_arv_put.py
@@ -21,6 +21,7 @@ from cStringIO import StringIO
import arvados
import arvados.commands.put as arv_put
+import arvados_testutil as tutil
from arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
import run_test_server
@@ -242,8 +243,6 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
def setUp(self):
super(ArvPutUploadJobTest, self).setUp()
run_test_server.authorize_with('active')
- self.exit_lock = threading.Lock()
- self.save_manifest_lock = threading.Lock()
# Temp files creation
self.tempdir = tempfile.mkdtemp()
subdir = os.path.join(self.tempdir, 'subdir')
@@ -254,7 +253,7 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
f.write(data * i)
with open(os.path.join(subdir, 'otherfile'), 'w') as f:
f.write(data * 5)
- # For large file resuming test
+ # Large temp file for resume test
_, self.large_file_name = tempfile.mkstemp()
fileobj = open(self.large_file_name, 'w')
# Make sure to write just a little more than one block
@@ -262,6 +261,7 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MB
fileobj.write(data)
fileobj.close()
+ self.arvfile_write = getattr(arvados.arvfile.ArvadosFileWriter, 'write')
def tearDown(self):
super(ArvPutUploadJobTest, self).tearDown()
@@ -311,64 +311,29 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
def test_resume_large_file_upload(self):
- # Proxying ArvadosFile.writeto() method to be able to synchronize it
- # with partial manifest saves
- orig_writeto_func = getattr(arvados.arvfile.ArvadosFile, 'writeto')
- orig_update_func = getattr(arv_put.ArvPutUploadJob, '_update')
- def wrapped_update(*args, **kwargs):
- job_instance = args[0]
- orig_update_func(*args, **kwargs)
- with self.save_manifest_lock:
- # Allow abnormal termination when first block written
- if job_instance._collection_size(job_instance._my_collection()) == arvados.config.KEEP_BLOCK_SIZE:
- self.exit_lock.release()
- def wrapped_writeto(*args, **kwargs):
- data = args[2]
+ def wrapped_write(*args, **kwargs):
+ data = args[1]
+ # Exit only on last block
if len(data) < arvados.config.KEEP_BLOCK_SIZE:
- # Lock on the last block write call, waiting for the
- # manifest to be saved
- with self.exit_lock:
- raise SystemExit('Test exception')
- ret = orig_writeto_func(*args, **kwargs)
- self.save_manifest_lock.release()
- return ret
- setattr(arvados.arvfile.ArvadosFile, 'writeto', wrapped_writeto)
- setattr(arv_put.ArvPutUploadJob, '_update', wrapped_update)
- # MD5 hash of random data to be uploaded
- md5_original = hashlib.md5()
- with open(self.large_file_name, 'r') as f:
- data = f.read()
- md5_original.update(data)
- self.exit_lock.acquire()
- self.save_manifest_lock.acquire()
- writer = arv_put.ArvPutUploadJob([self.large_file_name],
- update_time=0.1)
- # First upload: partially completed with simulated error
- try:
- self.assertRaises(SystemExit, writer.start())
- except SystemExit:
- # Avoid getting a ResumeCacheConflict on the 2nd run
- writer._cache_file.close()
- self.assertGreater(writer.bytes_written, 0)
- self.assertLess(writer.bytes_written,
- os.path.getsize(self.large_file_name))
-
- # Restore the ArvadosFile.writeto() method to before retrying
- setattr(arvados.arvfile.ArvadosFile, 'writeto', orig_writeto_func)
- # Restore the ArvPutUploadJob._update() method to before retrying
- setattr(arv_put.ArvPutUploadJob, '_update', orig_update_func)
- writer_new = arv_put.ArvPutUploadJob([self.large_file_name])
- writer_new.start()
- writer_new.destroy_cache()
- self.assertEqual(os.path.getsize(self.large_file_name),
- writer.bytes_written + writer_new.bytes_written)
- # Read the uploaded file to compare its md5 hash
- md5_uploaded = hashlib.md5()
- c = arvados.collection.Collection(writer_new.manifest_text())
- with c.open(os.path.basename(self.large_file_name), 'r') as f:
- new_data = f.read()
- md5_uploaded.update(new_data)
- self.assertEqual(md5_original.hexdigest(), md5_uploaded.hexdigest())
+ raise SystemExit("Simulated error")
+ return self.arvfile_write(*args, **kwargs)
+
+ with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
+ autospec=True) as mocked_write:
+ mocked_write.side_effect = wrapped_write
+ writer = arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1)
+ with self.assertRaises(SystemExit):
+ writer.start()
+ self.assertLess(writer.bytes_written,
+ os.path.getsize(self.large_file_name))
+ # Retry the upload
+ writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1)
+ writer2.start()
+ self.assertEqual(writer.bytes_written + writer2.bytes_written,
+ os.path.getsize(self.large_file_name))
+ writer2.destroy_cache()
class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list