[ARVADOS] updated: 1487c0577921c88801c377d753d1322fd1485968
Git user
git at public.curoverse.com
Fri Jul 22 13:52:06 EDT 2016
Summary of changes:
sdk/python/arvados/commands/put.py | 130 ++++++--------------------
sdk/python/tests/test_arv_put.py | 181 ++++++++++++++++++++++---------------
sdk/python/tests/test_arvfile.py | 0
3 files changed, 139 insertions(+), 172 deletions(-)
mode change 100755 => 100644 sdk/python/arvados/commands/put.py
mode change 100755 => 100644 sdk/python/tests/test_arv_put.py
mode change 100755 => 100644 sdk/python/tests/test_arvfile.py
via 1487c0577921c88801c377d753d1322fd1485968 (commit)
from 1c3f80531764931c241cd07a3cbc56892b645bce (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 1487c0577921c88801c377d753d1322fd1485968
Author: Lucas Di Pentima <lucas at curoverse.com>
Date: Fri Jul 22 14:51:52 2016 -0300
9463: Finished writing tests for ArvPutUploadJob
diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
old mode 100755
new mode 100644
index d2c5214..31cb0cb
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -196,10 +196,6 @@ class ResumeCacheConflict(Exception):
pass
-class FileUploadError(Exception):
- pass
-
-
class ResumeCache(object):
CACHE_DIR = '.cache/arvados/arv-put'
@@ -283,15 +279,18 @@ class ResumeCache(object):
class ArvPutUploadJob(object):
+ CACHE_DIR = '.cache/arvados/arv-put'
+
def __init__(self, paths, resume=True, reporter=None, bytes_expected=None,
- name=None, owner_uuid=None, ensure_unique_name=False,
- num_retries=None, write_copies=None, replication=None,
- filename=None):
+ name=None, owner_uuid=None, ensure_unique_name=False,
+ num_retries=None, write_copies=None, replication=None,
+ filename=None, update_time=60.0):
self.paths = paths
self.resume = resume
self.reporter = reporter
self.bytes_expected = bytes_expected
self.bytes_written = 0
+ self.bytes_skipped = 0
self.name = name
self.owner_uuid = owner_uuid
self.ensure_unique_name = ensure_unique_name
@@ -302,13 +301,12 @@ class ArvPutUploadJob(object):
self._state_lock = threading.Lock()
self._state = None # Previous run state (file list & manifest)
self._current_files = [] # Current run file list
- self._cache_hash = None # MD5 digest based on paths & filename
self._cache_file = None
self._collection = None
self._collection_lock = threading.Lock()
self._stop_checkpointer = threading.Event()
self._checkpointer = threading.Thread(target=self._update_task)
- self._update_task_time = 60.0 # How many seconds wait between update runs
+ self._update_task_time = update_time # How many seconds wait between update runs
# Load cached data if any and if needed
self._setup_state()
@@ -324,16 +322,18 @@ class ArvPutUploadJob(object):
self._write_stdin(self.filename or 'stdin')
elif os.path.isdir(path):
self._write_directory_tree(path)
- else: #if os.path.isfile(path):
+ else:
self._write_file(path, self.filename or os.path.basename(path))
- # else:
- # raise FileUploadError('Inadequate file type, cannot upload: %s' % path)
finally:
# Stop the thread before doing anything else
self._stop_checkpointer.set()
self._checkpointer.join()
# Successful upload, one last _update()
self._update()
+ if self.resume:
+ self._cache_file.close()
+ # Correct the final written bytes count
+ self.bytes_written -= self.bytes_skipped
def save_collection(self):
with self._collection_lock:
@@ -342,10 +342,11 @@ class ArvPutUploadJob(object):
ensure_unique_name=self.ensure_unique_name,
num_retries=self.num_retries,
replication_desired=self.replication)
+
+ def destroy_cache(self):
if self.resume:
- # Delete cache file upon successful collection saving
try:
- os.unlink(self._cache_file.name)
+ os.unlink(self._cache_filename)
except OSError as error:
if error.errno != errno.ENOENT: # That's what we wanted anyway.
raise
@@ -357,7 +358,7 @@ class ArvPutUploadJob(object):
"""
size = 0
for item in collection.values():
- if isinstance(item, arvados.collection.Collection):
+ if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
size += self._collection_size(item)
else:
size += item.size()
@@ -404,11 +405,9 @@ class ArvPutUploadJob(object):
if os.path.isdir(os.path.join(path, item)):
self._write_directory_tree(os.path.join(path, item),
os.path.join(stream_name, item))
- elif os.path.isfile(os.path.join(path, item)):
+ else:
self._write_file(os.path.join(path, item),
os.path.join(stream_name, item))
- else:
- raise FileUploadError('Inadequate file type, cannot upload: %s' % path)
def _write_stdin(self, filename):
with self._collection_lock:
@@ -441,6 +440,7 @@ class ArvPutUploadJob(object):
if cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
if os.path.getsize(source) == file_in_collection.size():
# File already there, skip it.
+ self.bytes_skipped += os.path.getsize(source)
return
elif os.path.getsize(source) > file_in_collection.size():
# File partially uploaded, resume!
@@ -458,6 +458,7 @@ class ArvPutUploadJob(object):
# Open for appending
output = self._my_collection().open(filename, 'a')
source_fd.seek(resume_offset)
+ self.bytes_skipped += resume_offset
else:
with self._collection_lock:
output = self._my_collection().open(filename, 'w')
@@ -498,12 +499,14 @@ class ArvPutUploadJob(object):
md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
realpaths = sorted(os.path.realpath(path) for path in self.paths)
md5.update('\0'.join(realpaths))
- self._cache_hash = md5.hexdigest()
if self.filename:
md5.update(self.filename)
+ cache_filename = md5.hexdigest()
self._cache_file = open(os.path.join(
- arv_cmd.make_home_conf_dir('.cache/arvados/arv-put', 0o700, 'raise'),
- self._cache_hash), 'a+')
+ arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
+ cache_filename), 'a+')
+ self._cache_filename = self._cache_file.name
+ self._lock_file(self._cache_file)
self._cache_file.seek(0)
with self._state_lock:
try:
@@ -544,13 +547,13 @@ class ArvPutUploadJob(object):
with self._state_lock:
state = self._state
new_cache_fd, new_cache_name = tempfile.mkstemp(
- dir=os.path.dirname(self._cache_file.name))
+ dir=os.path.dirname(self._cache_filename))
self._lock_file(new_cache_fd)
new_cache = os.fdopen(new_cache_fd, 'r+')
json.dump(state, new_cache)
- # new_cache.flush()
- # os.fsync(new_cache)
- os.rename(new_cache_name, self._cache_file.name)
+ new_cache.flush()
+ os.fsync(new_cache)
+ os.rename(new_cache_name, self._cache_filename)
except (IOError, OSError, ResumeCacheConflict) as error:
try:
os.unlink(new_cache_name)
@@ -607,81 +610,6 @@ class ArvPutUploadJob(object):
return datablocks
-class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
- STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS +
- ['bytes_written', '_seen_inputs'])
-
- def __init__(self, cache=None, reporter=None, bytes_expected=None, **kwargs):
- self.bytes_written = 0
- self._seen_inputs = []
- self.cache = cache
- self.reporter = reporter
- self.bytes_expected = bytes_expected
- super(ArvPutCollectionWriter, self).__init__(**kwargs)
-
- @classmethod
- def from_cache(cls, cache, reporter=None, bytes_expected=None,
- num_retries=0, replication=0):
- try:
- state = cache.load()
- state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
- writer = cls.from_state(state, cache, reporter, bytes_expected,
- num_retries=num_retries,
- replication=replication)
- except (TypeError, ValueError,
- arvados.errors.StaleWriterStateError) as error:
- return cls(cache, reporter, bytes_expected,
- num_retries=num_retries,
- replication=replication)
- else:
- return writer
-
- def cache_state(self):
- if self.cache is None:
- return
- state = self.dump_state()
- # Transform attributes for serialization.
- for attr, value in state.items():
- if attr == '_data_buffer':
- state[attr] = base64.encodestring(''.join(value))
- elif hasattr(value, 'popleft'):
- state[attr] = list(value)
- self.cache.save(state)
-
- def report_progress(self):
- if self.reporter is not None:
- self.reporter(self.bytes_written, self.bytes_expected)
-
- def flush_data(self):
- start_buffer_len = self._data_buffer_len
- start_block_count = self.bytes_written / arvados.config.KEEP_BLOCK_SIZE
- super(ArvPutCollectionWriter, self).flush_data()
- if self._data_buffer_len < start_buffer_len: # We actually PUT data.
- self.bytes_written += (start_buffer_len - self._data_buffer_len)
- self.report_progress()
- if (self.bytes_written / arvados.config.KEEP_BLOCK_SIZE) > start_block_count:
- self.cache_state()
-
- def _record_new_input(self, input_type, source_name, dest_name):
- # The key needs to be a list because that's what we'll get back
- # from JSON deserialization.
- key = [input_type, source_name, dest_name]
- if key in self._seen_inputs:
- return False
- self._seen_inputs.append(key)
- return True
-
- def write_file(self, source, filename=None):
- if self._record_new_input('file', source, filename):
- super(ArvPutCollectionWriter, self).write_file(source, filename)
-
- def write_directory_tree(self,
- path, stream_name='.', max_manifest_depth=-1):
- if self._record_new_input('directory', path, stream_name):
- super(ArvPutCollectionWriter, self).write_directory_tree(
- path, stream_name, max_manifest_depth)
-
-
def expected_bytes_for(pathlist):
# Walk the given directory trees and stat files, adding up file sizes,
# so we can display progress as percent
@@ -849,6 +777,8 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
if status != 0:
sys.exit(status)
+ else:
+ writer.destroy_cache()
return output
diff --git a/sdk/python/tests/test_arv_put.py b/sdk/python/tests/test_arv_put.py
old mode 100755
new mode 100644
index 5a4269b..87facac
--- a/sdk/python/tests/test_arv_put.py
+++ b/sdk/python/tests/test_arv_put.py
@@ -13,8 +13,7 @@ import tempfile
import time
import unittest
import yaml
-import multiprocessing
-import shutil
+import threading
import hashlib
import random
@@ -238,66 +237,34 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
arv_put.ResumeCache, path)
-class ArvadosPutCollectionWriterTest(run_test_server.TestCaseWithServers,
- ArvadosBaseTestCase):
+class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
+ ArvadosBaseTestCase):
def setUp(self):
- super(ArvadosPutCollectionWriterTest, self).setUp()
+ super(ArvPutUploadJobTest, self).setUp()
run_test_server.authorize_with('active')
- with tempfile.NamedTemporaryFile(delete=False) as cachefile:
- self.cache = arv_put.ResumeCache(cachefile.name)
- self.cache_filename = cachefile.name
+ self.exit_lock = threading.Lock()
+ self.save_manifest_lock = threading.Lock()
def tearDown(self):
- super(ArvadosPutCollectionWriterTest, self).tearDown()
- if os.path.exists(self.cache_filename):
- self.cache.destroy()
- self.cache.close()
-
- def test_writer_caches(self):
- cwriter = arv_put.ArvPutCollectionWriter(self.cache)
- cwriter.write_file('/dev/null')
- cwriter.cache_state()
- self.assertTrue(self.cache.load())
- self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
+ super(ArvPutUploadJobTest, self).tearDown()
def test_writer_works_without_cache(self):
- cwriter = arv_put.ArvPutCollectionWriter()
- cwriter.write_file('/dev/null')
- self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
-
- def test_writer_resumes_from_cache(self):
- cwriter = arv_put.ArvPutCollectionWriter(self.cache)
- with self.make_test_file() as testfile:
- cwriter.write_file(testfile.name, 'test')
- cwriter.cache_state()
- new_writer = arv_put.ArvPutCollectionWriter.from_cache(
- self.cache)
- self.assertEqual(
- ". 098f6bcd4621d373cade4e832627b4f6+4 0:4:test\n",
- new_writer.manifest_text())
-
- def test_new_writer_from_stale_cache(self):
- cwriter = arv_put.ArvPutCollectionWriter(self.cache)
- with self.make_test_file() as testfile:
- cwriter.write_file(testfile.name, 'test')
- new_writer = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
- new_writer.write_file('/dev/null')
- self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", new_writer.manifest_text())
-
- def test_new_writer_from_empty_cache(self):
- cwriter = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
- cwriter.write_file('/dev/null')
+ cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
+ cwriter.start()
self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
- def test_writer_resumable_after_arbitrary_bytes(self):
- cwriter = arv_put.ArvPutCollectionWriter(self.cache)
- # These bytes are intentionally not valid UTF-8.
- with self.make_test_file('\x00\x07\xe2') as testfile:
- cwriter.write_file(testfile.name, 'test')
- cwriter.cache_state()
- new_writer = arv_put.ArvPutCollectionWriter.from_cache(
- self.cache)
- self.assertEqual(cwriter.manifest_text(), new_writer.manifest_text())
+ def test_writer_works_with_cache(self):
+ with tempfile.NamedTemporaryFile() as f:
+ f.write('foo')
+ f.flush()
+ cwriter = arv_put.ArvPutUploadJob([f.name])
+ cwriter.start()
+ self.assertEqual(3, cwriter.bytes_written)
+ # Don't destroy the cache, and start another upload
+ cwriter_new = arv_put.ArvPutUploadJob([f.name])
+ cwriter_new.start()
+ self.assertEqual(0, cwriter_new.bytes_written)
+ cwriter_new.destroy_cache()
def make_progress_tester(self):
progression = []
@@ -306,24 +273,94 @@ class ArvadosPutCollectionWriterTest(run_test_server.TestCaseWithServers,
return progression, record_func
def test_progress_reporting(self):
- for expect_count in (None, 8):
- progression, reporter = self.make_progress_tester()
- cwriter = arv_put.ArvPutCollectionWriter(
- reporter=reporter, bytes_expected=expect_count)
- with self.make_test_file() as testfile:
- cwriter.write_file(testfile.name, 'test')
- cwriter.finish_current_stream()
- self.assertIn((4, expect_count), progression)
-
- def test_resume_progress(self):
- cwriter = arv_put.ArvPutCollectionWriter(self.cache, bytes_expected=4)
- with self.make_test_file() as testfile:
- # Set up a writer with some flushed bytes.
- cwriter.write_file(testfile.name, 'test')
- cwriter.finish_current_stream()
- cwriter.cache_state()
- new_writer = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
- self.assertEqual(new_writer.bytes_written, 4)
+ with tempfile.NamedTemporaryFile() as f:
+ f.write('foo')
+ f.flush()
+ for expect_count in (None, 8):
+ progression, reporter = self.make_progress_tester()
+ cwriter = arv_put.ArvPutUploadJob([f.name],
+ reporter=reporter, bytes_expected=expect_count)
+ cwriter.start()
+ cwriter.destroy_cache()
+ self.assertIn((3, expect_count), progression)
+
+ def test_writer_upload_directory(self):
+ tempdir = tempfile.mkdtemp()
+ subdir = os.path.join(tempdir, 'subdir')
+ os.mkdir(subdir)
+ data = "x" * 1024 # 1 KB
+ for i in range(1, 5):
+ with open(os.path.join(tempdir, str(i)), 'w') as f:
+ f.write(data * i)
+ with open(os.path.join(subdir, 'otherfile'), 'w') as f:
+ f.write(data * 5)
+ cwriter = arv_put.ArvPutUploadJob([tempdir])
+ cwriter.start()
+ cwriter.destroy_cache()
+ shutil.rmtree(tempdir)
+ self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
+
+ def test_resume_large_file_upload(self):
+ # Proxying ArvadosFile.writeto() method to be able to synchronize it
+ # with partial manifest saves
+ orig_func = getattr(arvados.arvfile.ArvadosFile, 'writeto')
+ def wrapped_func(*args, **kwargs):
+ data = args[2]
+ if len(data) < arvados.config.KEEP_BLOCK_SIZE:
+ # Lock on the last block write call, waiting for the
+ # manifest to be saved
+ self.exit_lock.acquire()
+ raise SystemExit('Test exception')
+ ret = orig_func(*args, **kwargs)
+ self.save_manifest_lock.release()
+ return ret
+ setattr(arvados.arvfile.ArvadosFile, 'writeto', wrapped_func)
+ # Take advantage of the reporter feature to sync the partial
+ # manifest writing with the simulated upload error.
+ def fake_reporter(written, expected):
+ # Wait until there's something to save
+ self.save_manifest_lock.acquire()
+ # Once the partial manifest is saved, allow exiting
+ self.exit_lock.release()
+ # Create random data to be uploaded
+ md5_original = hashlib.md5()
+ _, filename = tempfile.mkstemp()
+ fileobj = open(filename, 'w')
+ # Make sure to write just a little more than one block
+ for _ in range((arvados.config.KEEP_BLOCK_SIZE/(1024*1024))+1):
+ data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MB
+ md5_original.update(data)
+ fileobj.write(data)
+ fileobj.close()
+ self.exit_lock.acquire()
+ self.save_manifest_lock.acquire()
+ writer = arv_put.ArvPutUploadJob([filename],
+ reporter=fake_reporter,
+ update_time=0.1)
+ # First upload: partially completed with simulated error
+ try:
+ self.assertRaises(SystemExit, writer.start())
+ except SystemExit:
+ # Avoid getting a ResumeCacheConflict on the 2nd run
+ writer._cache_file.close()
+ self.assertLess(writer.bytes_written, os.path.getsize(filename))
+
+ # Restore the ArvadosFile.writeto() method to before retrying
+ setattr(arvados.arvfile.ArvadosFile, 'writeto', orig_func)
+ writer_new = arv_put.ArvPutUploadJob([filename])
+ writer_new.start()
+ writer_new.destroy_cache()
+ self.assertEqual(os.path.getsize(filename),
+ writer.bytes_written + writer_new.bytes_written)
+ # Read the uploaded file to compare its md5 hash
+ md5_uploaded = hashlib.md5()
+ c = arvados.collection.Collection(writer_new.manifest_text())
+ with c.open(os.path.basename(filename), 'r') as f:
+ new_data = f.read()
+ md5_uploaded.update(new_data)
+ self.assertEqual(md5_original.hexdigest(), md5_uploaded.hexdigest())
+ # Cleaning up
+ os.unlink(filename)
class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
diff --git a/sdk/python/tests/test_arvfile.py b/sdk/python/tests/test_arvfile.py
old mode 100755
new mode 100644
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list