[ARVADOS] updated: e69ce852e1cbbe5bab82e32ec5d1874ef5a768f3
Git user
git at public.curoverse.com
Fri Jul 29 09:10:51 EDT 2016
Summary of changes:
sdk/python/arvados/arvfile.py | 10 ++--
sdk/python/arvados/collection.py | 40 +++++++++-------
sdk/python/arvados/commands/put.py | 91 +++++++++++++++---------------------
sdk/python/tests/test_arv_put.py | 90 +++++++++++++++++++----------------
sdk/python/tests/test_arvfile.py | 5 +-
sdk/python/tests/test_collections.py | 18 +++++++
6 files changed, 137 insertions(+), 117 deletions(-)
via e69ce852e1cbbe5bab82e32ec5d1874ef5a768f3 (commit)
via 62d56bce0d714cc2df2ab5e7f1005dc3d76f783b (commit)
via 69902ee6583e1de32786e80b77c8f61870ed6f90 (commit)
via 9a363fce5687e55c5554b3eaeee16e7f1f0791f6 (commit)
via 2b52a4885952a8a3eed01b03af33210fc86d6ce5 (commit)
via 05e30c2874cfee6e448de38254d4eb6007abb1cd (commit)
via bca6c8e4c3d880955d19f7b6ff50bc3fbc31146c (commit)
from 1487c0577921c88801c377d753d1322fd1485968 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit e69ce852e1cbbe5bab82e32ec5d1874ef5a768f3
Author: Lucas Di Pentima <lucas at curoverse.com>
Date: Fri Jul 29 10:08:36 2016 -0300
9463: Unify replication_desired & write_copies parameters to only one, passing it at Collection's constructor and let the class decide which value is best from default when None is passed.
diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index f076773..b2c40f1 100644
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -289,7 +289,7 @@ class ArvPutUploadJob(object):
def __init__(self, paths, resume=True, reporter=None, bytes_expected=None,
name=None, owner_uuid=None, ensure_unique_name=False,
- num_retries=None, write_copies=None, replication=None,
+ num_retries=None, replication_desired=None,
filename=None, update_time=60.0):
self.paths = paths
self.resume = resume
@@ -301,8 +301,7 @@ class ArvPutUploadJob(object):
self.owner_uuid = owner_uuid
self.ensure_unique_name = ensure_unique_name
self.num_retries = num_retries
- self.write_copies = write_copies
- self.replication = replication
+ self.replication_desired = replication_desired
self.filename = filename
self._state_lock = threading.Lock()
self._state = None # Previous run state (file list & manifest)
@@ -345,17 +344,17 @@ class ArvPutUploadJob(object):
def save_collection(self):
with self._collection_lock:
self._my_collection().save_new(
- name=self.name, owner_uuid=self.owner_uuid,
- ensure_unique_name=self.ensure_unique_name,
- num_retries=self.num_retries,
- replication_desired=self.replication)
+ name=self.name, owner_uuid=self.owner_uuid,
+ ensure_unique_name=self.ensure_unique_name,
+ num_retries=self.num_retries)
def destroy_cache(self):
if self.resume:
try:
os.unlink(self._cache_filename)
except OSError as error:
- if error.errno != errno.ENOENT: # That's what we wanted anyway.
+ # That's what we wanted anyway.
+ if error.errno != errno.ENOENT:
raise
self._cache_file.close()
@@ -440,7 +439,8 @@ class ArvPutUploadJob(object):
'mtime': os.path.getmtime(source),
'size' : os.path.getsize(source)
}
- cached_file_data = self._state['files'][source]
+ with self._state_lock:
+ cached_file_data = self._state['files'][source]
# See if this file was already uploaded at least partially
if file_in_collection:
if cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
@@ -488,12 +488,12 @@ class ArvPutUploadJob(object):
if self.resume and manifest is not None:
# Create collection from saved state
self._collection = arvados.collection.Collection(
- manifest,
- num_write_copies=self.write_copies)
+ manifest,
+ replication_desired=self.replication_desired)
else:
# Create new collection
self._collection = arvados.collection.Collection(
- num_write_copies=self.write_copies)
+ replication_desired=self.replication_desired)
return self._collection
def _setup_state(self):
@@ -688,19 +688,6 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
print >>stderr, error
sys.exit(1)
- # write_copies diverges from args.replication here.
- # args.replication is how many copies we will instruct Arvados to
- # maintain (by passing it in collections().create()) after all
- # data is written -- and if None was given, we'll use None there.
- # Meanwhile, write_copies is how many copies of each data block we
- # write to Keep, which has to be a number.
- #
- # If we simply changed args.replication from None to a default
- # here, we'd end up erroneously passing the default replication
- # level (instead of None) to collections().create().
- write_copies = (args.replication or
- api_client._rootDesc.get('defaultCollectionReplication', 2))
-
if args.progress:
reporter = progress_writer(human_progress)
elif args.batch_progress:
@@ -715,8 +702,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
reporter = reporter,
bytes_expected = bytes_expected,
num_retries = args.retries,
- write_copies = write_copies,
- replication = args.replication,
+ replication_desired = args.replication,
name = collection_name,
owner_uuid = project_uuid,
ensure_unique_name = True)
@@ -776,9 +762,9 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
if status != 0:
sys.exit(status)
- else:
- writer.destroy_cache()
+ # Success!
+ writer.destroy_cache()
return output
commit 62d56bce0d714cc2df2ab5e7f1005dc3d76f783b
Author: Lucas Di Pentima <lucas at curoverse.com>
Date: Thu Jul 28 14:58:12 2016 -0300
9463: Unified use of 'replication_desired' param on Collection class at instantiation.
Removed the need to pass the number of copies to be written to keep on save_new() method,
it will be inferred from replication_desired setting or looked up from defaults.
Added functionality to Collection class to keep replication_desired configuration when
loading an already existing collection from API server, with tests validating this new
behaviour.
Corrected some already existing tests to work with this changes.
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index 098427c..62b6526 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -1136,7 +1136,7 @@ class Collection(RichCollectionBase):
parent=None,
apiconfig=None,
block_manager=None,
- num_write_copies=None):
+ replication_desired=None):
"""Collection constructor.
:manifest_locator_or_text:
@@ -1144,24 +1144,35 @@ class Collection(RichCollectionBase):
a manifest, raw manifest text, or None (to create an empty collection).
:parent:
the parent Collection, may be None.
+
:apiconfig:
A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
Prefer this over supplying your own api_client and keep_client (except in testing).
Will use default config settings if not specified.
+
:api_client:
The API client object to use for requests. If not specified, create one using `apiconfig`.
+
:keep_client:
the Keep client to use for requests. If not specified, create one using `apiconfig`.
+
:num_retries:
the number of retries for API and Keep requests.
+
:block_manager:
the block manager to use. If not specified, create one.
+ :replication_desired:
+ How many copies should Arvados maintain. If None, API server default
+ configuration applies. If not None, this value will also be used
+ for determining the number of block copies being written.
+
"""
super(Collection, self).__init__(parent)
self._api_client = api_client
self._keep_client = keep_client
self._block_manager = block_manager
+ self.replication_desired = replication_desired
if apiconfig:
self._config = apiconfig
@@ -1169,7 +1180,6 @@ class Collection(RichCollectionBase):
self._config = config.settings()
self.num_retries = num_retries if num_retries is not None else 0
- self.num_write_copies = num_write_copies
self._manifest_locator = None
self._manifest_text = None
self._api_response = None
@@ -1234,7 +1244,8 @@ class Collection(RichCollectionBase):
def _my_api(self):
if self._api_client is None:
self._api_client = ThreadSafeApiCache(self._config)
- self._keep_client = self._api_client.keep
+ if self._keep_client is None:
+ self._keep_client = self._api_client.keep
return self._api_client
@synchronized
@@ -1249,7 +1260,10 @@ class Collection(RichCollectionBase):
@synchronized
def _my_block_manager(self):
if self._block_manager is None:
- self._block_manager = _BlockManager(self._my_keep(), copies=self.num_write_copies)
+ copies = (self.replication_desired or
+ self._my_api()._rootDesc.get('defaultCollectionReplication',
+ 2))
+ self._block_manager = _BlockManager(self._my_keep(), copies=copies)
return self._block_manager
def _remember_api_response(self, response):
@@ -1269,6 +1283,10 @@ class Collection(RichCollectionBase):
uuid=self._manifest_locator).execute(
num_retries=self.num_retries))
self._manifest_text = self._api_response['manifest_text']
+ # If not overriden via kwargs, we should try to load the
+ # replication_desired from the API server
+ if self.replication_desired is None:
+ self.replication_desired = self._api_response.get('replication_desired', None)
return None
except Exception as e:
return e
@@ -1442,8 +1460,7 @@ class Collection(RichCollectionBase):
create_collection_record=True,
owner_uuid=None,
ensure_unique_name=False,
- num_retries=None,
- replication_desired=None):
+ num_retries=None):
"""Save collection to a new collection record.
Commit pending buffer blocks to Keep and, when create_collection_record
@@ -1470,27 +1487,18 @@ class Collection(RichCollectionBase):
:num_retries:
Retry count on API calls (if None, use the collection default)
- :replication_desired:
- How many copies should Arvados maintain. If None, API server default
- configuration applies.
-
"""
self._my_block_manager().commit_all()
text = self.manifest_text(strip=False)
if create_collection_record:
- replication_attr = 'replication_desired'
- if self._my_api()._schema.schemas['Collection']['properties'].get(replication_attr, None) is None:
- # API called it 'redundancy' before #3410.
- replication_attr = 'redundancy'
-
if name is None:
name = "New collection"
ensure_unique_name = True
body = {"manifest_text": text,
"name": name,
- replication_attr: replication_desired}
+ "replication_desired": self.replication_desired}
if owner_uuid:
body["owner_uuid"] = owner_uuid
diff --git a/sdk/python/tests/test_arvfile.py b/sdk/python/tests/test_arvfile.py
index 2e43216..6b35626 100644
--- a/sdk/python/tests/test_arvfile.py
+++ b/sdk/python/tests/test_arvfile.py
@@ -28,7 +28,7 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
def get_from_cache(self, locator):
self.requests.append(locator)
return self.blocks.get(locator)
- def put(self, data, num_retries=None):
+ def put(self, data, num_retries=None, copies=None):
pdh = tutil.str_keep_locator(data)
self.blocks[pdh] = str(data)
return pdh
@@ -38,6 +38,7 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
self.body = b
self.response = r
self._schema = ArvadosFileWriterTestCase.MockApi.MockSchema()
+ self._rootDesc = {}
class MockSchema(object):
def __init__(self):
self.schemas = {'Collection': {'properties': {'replication_desired': {'type':'integer'}}}}
diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py
index ff0d684..41c8c01 100644
--- a/sdk/python/tests/test_collections.py
+++ b/sdk/python/tests/test_collections.py
@@ -804,6 +804,24 @@ class CollectionWriterTestCase(unittest.TestCase, CollectionTestMixin):
class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin):
+ def test_replication_desired_kept_on_load(self):
+ m = '. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt 0:10:count2.txt\n'
+ c1 = Collection(m, replication_desired=1)
+ c1.save_new()
+ loc = c1.manifest_locator()
+ c2 = Collection(loc)
+ self.assertEqual(c1.manifest_text, c2.manifest_text)
+ self.assertEqual(c1.replication_desired, c2.replication_desired)
+
+ def test_replication_desired_not_loaded_if_provided(self):
+ m = '. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt 0:10:count2.txt\n'
+ c1 = Collection(m, replication_desired=1)
+ c1.save_new()
+ loc = c1.manifest_locator()
+ c2 = Collection(loc, replication_desired=2)
+ self.assertEqual(c1.manifest_text, c2.manifest_text)
+ self.assertNotEqual(c1.replication_desired, c2.replication_desired)
+
def test_init_manifest(self):
m1 = """. 5348b82a029fd9e971a811ce1f71360b+43 0:43:md5sum.txt
. 085c37f02916da1cad16f93c54d899b7+41 0:41:md5sum.txt
commit 69902ee6583e1de32786e80b77c8f61870ed6f90
Author: Lucas Di Pentima <lucas at curoverse.com>
Date: Wed Jul 27 09:44:18 2016 -0300
9463: Parameter naming unification on _BlockManager. Corrected mocked discovery doc on test.
diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index 90db6ce..f2f7df2 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -414,7 +414,7 @@ class _BlockManager(object):
self.prefetch_enabled = True
self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
- self.num_put_copies = copies
+ self.copies = copies
@synchronized
def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
@@ -465,10 +465,10 @@ class _BlockManager(object):
if bufferblock is None:
return
- if self.num_put_copies is None:
+ if self.copies is None:
loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
else:
- loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.num_put_copies)
+ loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
bufferblock.set_state(_BufferBlock.COMMITTED, loc)
except Exception as e:
@@ -581,10 +581,10 @@ class _BlockManager(object):
if sync:
try:
- if self.num_put_copies is None:
+ if self.copies is None:
loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
else:
- loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.num_put_copies)
+ loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
block.set_state(_BufferBlock.COMMITTED, loc)
except Exception as e:
block.set_state(_BufferBlock.ERROR, e)
diff --git a/sdk/python/tests/test_arvfile.py b/sdk/python/tests/test_arvfile.py
index da8bf68..2e43216 100644
--- a/sdk/python/tests/test_arvfile.py
+++ b/sdk/python/tests/test_arvfile.py
@@ -40,7 +40,7 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
self._schema = ArvadosFileWriterTestCase.MockApi.MockSchema()
class MockSchema(object):
def __init__(self):
- self.schemas = {'Collection': {'properties': {'replication_desired': 2}}}
+ self.schemas = {'Collection': {'properties': {'replication_desired': {'type':'integer'}}}}
class MockCollections(object):
def __init__(self, b, r):
self.body = b
commit 9a363fce5687e55c5554b3eaeee16e7f1f0791f6
Author: Lucas Di Pentima <lucas at curoverse.com>
Date: Tue Jul 26 19:39:27 2016 -0300
9463: Temp files & dirs creation/cleanup moved to setUp and tearDown stages. Replaced usage of reporter function with a second wrapper function to make the test cleaner
diff --git a/sdk/python/tests/test_arv_put.py b/sdk/python/tests/test_arv_put.py
index 87facac..7d8d790 100644
--- a/sdk/python/tests/test_arv_put.py
+++ b/sdk/python/tests/test_arv_put.py
@@ -244,9 +244,29 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
run_test_server.authorize_with('active')
self.exit_lock = threading.Lock()
self.save_manifest_lock = threading.Lock()
+ # Temp files creation
+ self.tempdir = tempfile.mkdtemp()
+ subdir = os.path.join(self.tempdir, 'subdir')
+ os.mkdir(subdir)
+ data = "x" * 1024 # 1 KB
+ for i in range(1, 5):
+ with open(os.path.join(self.tempdir, str(i)), 'w') as f:
+ f.write(data * i)
+ with open(os.path.join(subdir, 'otherfile'), 'w') as f:
+ f.write(data * 5)
+ # For large file resuming test
+ _, self.large_file_name = tempfile.mkstemp()
+ fileobj = open(self.large_file_name, 'w')
+ # Make sure to write just a little more than one block
+ for _ in range((arvados.config.KEEP_BLOCK_SIZE/(1024*1024))+1):
+ data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MB
+ fileobj.write(data)
+ fileobj.close()
def tearDown(self):
super(ArvPutUploadJobTest, self).tearDown()
+ shutil.rmtree(self.tempdir)
+ os.unlink(self.large_file_name)
def test_writer_works_without_cache(self):
cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
@@ -285,57 +305,43 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
self.assertIn((3, expect_count), progression)
def test_writer_upload_directory(self):
- tempdir = tempfile.mkdtemp()
- subdir = os.path.join(tempdir, 'subdir')
- os.mkdir(subdir)
- data = "x" * 1024 # 1 KB
- for i in range(1, 5):
- with open(os.path.join(tempdir, str(i)), 'w') as f:
- f.write(data * i)
- with open(os.path.join(subdir, 'otherfile'), 'w') as f:
- f.write(data * 5)
- cwriter = arv_put.ArvPutUploadJob([tempdir])
+ cwriter = arv_put.ArvPutUploadJob([self.tempdir])
cwriter.start()
cwriter.destroy_cache()
- shutil.rmtree(tempdir)
self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
def test_resume_large_file_upload(self):
# Proxying ArvadosFile.writeto() method to be able to synchronize it
# with partial manifest saves
- orig_func = getattr(arvados.arvfile.ArvadosFile, 'writeto')
- def wrapped_func(*args, **kwargs):
+ orig_writeto_func = getattr(arvados.arvfile.ArvadosFile, 'writeto')
+ orig_update_func = getattr(arv_put.ArvPutUploadJob, '_update')
+ def wrapped_update(*args, **kwargs):
+ job_instance = args[0]
+ orig_update_func(*args, **kwargs)
+ with self.save_manifest_lock:
+ # Allow abnormal termination when first block written
+ if job_instance._collection_size(job_instance._my_collection()) == arvados.config.KEEP_BLOCK_SIZE:
+ self.exit_lock.release()
+ def wrapped_writeto(*args, **kwargs):
data = args[2]
if len(data) < arvados.config.KEEP_BLOCK_SIZE:
# Lock on the last block write call, waiting for the
# manifest to be saved
- self.exit_lock.acquire()
- raise SystemExit('Test exception')
- ret = orig_func(*args, **kwargs)
+ with self.exit_lock:
+ raise SystemExit('Test exception')
+ ret = orig_writeto_func(*args, **kwargs)
self.save_manifest_lock.release()
return ret
- setattr(arvados.arvfile.ArvadosFile, 'writeto', wrapped_func)
- # Take advantage of the reporter feature to sync the partial
- # manifest writing with the simulated upload error.
- def fake_reporter(written, expected):
- # Wait until there's something to save
- self.save_manifest_lock.acquire()
- # Once the partial manifest is saved, allow exiting
- self.exit_lock.release()
- # Create random data to be uploaded
+ setattr(arvados.arvfile.ArvadosFile, 'writeto', wrapped_writeto)
+ setattr(arv_put.ArvPutUploadJob, '_update', wrapped_update)
+ # MD5 hash of random data to be uploaded
md5_original = hashlib.md5()
- _, filename = tempfile.mkstemp()
- fileobj = open(filename, 'w')
- # Make sure to write just a little more than one block
- for _ in range((arvados.config.KEEP_BLOCK_SIZE/(1024*1024))+1):
- data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MB
+ with open(self.large_file_name, 'r') as f:
+ data = f.read()
md5_original.update(data)
- fileobj.write(data)
- fileobj.close()
self.exit_lock.acquire()
self.save_manifest_lock.acquire()
- writer = arv_put.ArvPutUploadJob([filename],
- reporter=fake_reporter,
+ writer = arv_put.ArvPutUploadJob([self.large_file_name],
update_time=0.1)
# First upload: partially completed with simulated error
try:
@@ -343,24 +349,26 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
except SystemExit:
# Avoid getting a ResumeCacheConflict on the 2nd run
writer._cache_file.close()
- self.assertLess(writer.bytes_written, os.path.getsize(filename))
+ self.assertGreater(writer.bytes_written, 0)
+ self.assertLess(writer.bytes_written,
+ os.path.getsize(self.large_file_name))
# Restore the ArvadosFile.writeto() method to before retrying
- setattr(arvados.arvfile.ArvadosFile, 'writeto', orig_func)
- writer_new = arv_put.ArvPutUploadJob([filename])
+ setattr(arvados.arvfile.ArvadosFile, 'writeto', orig_writeto_func)
+ # Restore the ArvPutUploadJob._update() method to before retrying
+ setattr(arv_put.ArvPutUploadJob, '_update', orig_update_func)
+ writer_new = arv_put.ArvPutUploadJob([self.large_file_name])
writer_new.start()
writer_new.destroy_cache()
- self.assertEqual(os.path.getsize(filename),
+ self.assertEqual(os.path.getsize(self.large_file_name),
writer.bytes_written + writer_new.bytes_written)
# Read the uploaded file to compare its md5 hash
md5_uploaded = hashlib.md5()
c = arvados.collection.Collection(writer_new.manifest_text())
- with c.open(os.path.basename(filename), 'r') as f:
+ with c.open(os.path.basename(self.large_file_name), 'r') as f:
new_data = f.read()
md5_uploaded.update(new_data)
self.assertEqual(md5_original.hexdigest(), md5_uploaded.hexdigest())
- # Cleaning up
- os.unlink(filename)
class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
commit 2b52a4885952a8a3eed01b03af33210fc86d6ce5
Author: Lucas Di Pentima <lucas at curoverse.com>
Date: Tue Jul 26 17:55:29 2016 -0300
9463: Added logging messages to warn the user about anomalous cases
diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index eb8199c..f076773 100644
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -21,6 +21,7 @@ import sys
import tempfile
import threading
import copy
+import logging
from apiclient import errors as apiclient_errors
import arvados.commands._util as arv_cmd
@@ -312,6 +313,7 @@ class ArvPutUploadJob(object):
self._stop_checkpointer = threading.Event()
self._checkpointer = threading.Thread(target=self._update_task)
self._update_task_time = update_time # How many seconds wait between update runs
+ self.logger = logging.getLogger('arvados.arv_put')
# Load cached data if any and if needed
self._setup_state()
@@ -451,7 +453,7 @@ class ArvPutUploadJob(object):
resume_offset = file_in_collection.size()
else:
# Inconsistent cache, re-upload the file
- pass # TODO: log warning message
+ self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
else:
# Local file differs from cached data, re-upload it
pass
@@ -551,6 +553,7 @@ class ArvPutUploadJob(object):
os.fsync(new_cache)
os.rename(new_cache_name, self._cache_filename)
except (IOError, OSError, ResumeCacheConflict) as error:
+ self.logger.error("There was a problem while saving the cache file: {}".format(error))
try:
os.unlink(new_cache_name)
except NameError: # mkstemp failed.
commit 05e30c2874cfee6e448de38254d4eb6007abb1cd
Author: Lucas Di Pentima <lucas at curoverse.com>
Date: Tue Jul 26 16:45:29 2016 -0300
9463: Optimizations on _write_ffile()
diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index 7443dc2..eb8199c 100644
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -422,7 +422,6 @@ class ArvPutUploadJob(object):
def _write_file(self, source, filename):
resume_offset = 0
- resume_upload = False
if self.resume:
# Check if file was already uploaded (at least partially)
with self._collection_lock:
@@ -433,38 +432,38 @@ class ArvPutUploadJob(object):
file_in_collection = None
# If no previous cached data on this file, store it for an eventual
# repeated run.
- if source not in self._state['files'].keys():
+ if source not in self._state['files']:
with self._state_lock:
self._state['files'][source] = {
- 'mtime' : os.path.getmtime(source),
+ 'mtime': os.path.getmtime(source),
'size' : os.path.getsize(source)
}
cached_file_data = self._state['files'][source]
# See if this file was already uploaded at least partially
if file_in_collection:
if cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
- if os.path.getsize(source) == file_in_collection.size():
+ if cached_file_data['size'] == file_in_collection.size():
# File already there, skip it.
- self.bytes_skipped += os.path.getsize(source)
+ self.bytes_skipped += cached_file_data['size']
return
- elif os.path.getsize(source) > file_in_collection.size():
+ elif cached_file_data['size'] > file_in_collection.size():
# File partially uploaded, resume!
- resume_upload = True
resume_offset = file_in_collection.size()
else:
# Inconsistent cache, re-upload the file
- pass
+ pass # TODO: log warning message
else:
# Local file differs from cached data, re-upload it
pass
with open(source, 'r') as source_fd:
- if self.resume and resume_upload:
+ if resume_offset > 0:
+ # Start upload where we left off
with self._collection_lock:
- # Open for appending
output = self._my_collection().open(filename, 'a')
source_fd.seek(resume_offset)
self.bytes_skipped += resume_offset
else:
+ # Start from scratch
with self._collection_lock:
output = self._my_collection().open(filename, 'w')
self._write(source_fd, output)
commit bca6c8e4c3d880955d19f7b6ff50bc3fbc31146c
Author: Lucas Di Pentima <lucas at curoverse.com>
Date: Tue Jul 26 14:46:09 2016 -0300
9463: Use a constant as a template for initializing the empty cache
diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index 31cb0cb..7443dc2 100644
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -20,6 +20,7 @@ import socket
import sys
import tempfile
import threading
+import copy
from apiclient import errors as apiclient_errors
import arvados.commands._util as arv_cmd
@@ -280,6 +281,10 @@ class ResumeCache(object):
class ArvPutUploadJob(object):
CACHE_DIR = '.cache/arvados/arv-put'
+ EMPTY_STATE = {
+ 'manifest' : None, # Last saved manifest checkpoint
+ 'files' : {} # Previous run file list: {path : {size, mtime}}
+ }
def __init__(self, paths, resume=True, reporter=None, bytes_expected=None,
name=None, owner_uuid=None, ensure_unique_name=False,
@@ -511,27 +516,19 @@ class ArvPutUploadJob(object):
with self._state_lock:
try:
self._state = json.load(self._cache_file)
- if not 'manifest' in self._state.keys():
- self._state['manifest'] = ""
- if not 'files' in self._state.keys():
- self._state['files'] = {}
+ if not set(['manifest', 'files']).issubset(set(self._state.keys())):
+ # Cache at least partially incomplete, set up new cache
+ self._state = copy.deepcopy(self.EMPTY_STATE)
except ValueError:
- # File empty, set up new cache
- self._state = {
- 'manifest' : None,
- # Previous run file list: {path : {size, mtime}}
- 'files' : {}
- }
+ # Cache file empty, set up new cache
+ self._state = copy.deepcopy(self.EMPTY_STATE)
# Load how many bytes were uploaded on previous run
with self._collection_lock:
self.bytes_written = self._collection_size(self._my_collection())
# No resume required
else:
with self._state_lock:
- self._state = {
- 'manifest' : None,
- 'files' : {} # Previous run file list: {path : {size, mtime}}
- }
+ self._state = copy.deepcopy(self.EMPTY_STATE)
def _lock_file(self, fileobj):
try:
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list