[ARVADOS] updated: 3847357436978a97470e0769f3200d354a2bd08e
Git user
git at public.curoverse.com
Tue Jul 5 17:53:53 EDT 2016
Summary of changes:
sdk/python/arvados/commands/put.py | 151 +++++++++++++++++++++++++++----------
sdk/python/tests/test_arv_put.py | 51 +++++++++----
2 files changed, 145 insertions(+), 57 deletions(-)
via 3847357436978a97470e0769f3200d354a2bd08e (commit)
from 44ea4c89930d52d142da2c0b7204fd34f61f8f39 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 3847357436978a97470e0769f3200d354a2bd08e
Author: Lucas Di Pentima <lucas at curoverse.com>
Date: Tue Jul 5 18:53:37 2016 -0300
9463: Lots of progress today, resume upload code written, many tests to do!
diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index dffa576..6e216ee 100755
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -278,8 +278,6 @@ class ResumeCache(object):
class ArvPutCollectionCache(object):
- CACHE_DIR = '.cache/arvados/arv-put'
-
def __init__(self, paths):
md5 = hashlib.md5()
md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
@@ -288,8 +286,12 @@ class ArvPutCollectionCache(object):
for path in realpaths:
self._get_file_data(path)
# Only hash args paths
- self.cache_hash = md5.update('\0'.join(realpaths))
- self.cache_file = open(os.path.join(self.CACHE_DIR, self.cache_hash), 'a+')
+ md5.update('\0'.join(realpaths))
+ self.cache_hash = md5.hexdigest()
+
+ self.cache_file = open(os.path.join(
+ arv_cmd.make_home_conf_dir('.cache/arvados/arv-put', 0o700, 'raise'),
+ self.cache_hash), 'a+')
self._lock_file(self.cache_file)
self.filename = self.cache_file.name
self.data = self._load()
@@ -301,25 +303,56 @@ class ArvPutCollectionCache(object):
except ValueError:
# File empty, set up new cache
ret = {
- 'col_uuid' : None, # Collection UUID
- 'uploaded' : [], # Uploaded file
- 'files': {}, # Complete cached file (path, mtime, size) list
+ 'col_locator' : None, # Collection
+ 'uploaded' : {}, # Uploaded file list: {path : {size, mtime}}
}
return ret
+ def _save(self):
+ """
+ Atomically save (create temp file & rename() it)
+ """
+ # TODO: Should be a good idea to avoid _save() spamming? when writing
+ # lots of small files.
+ print "SAVE START"
+ try:
+ new_cache_fd, new_cache_name = tempfile.mkstemp(
+ dir=os.path.dirname(self.filename))
+ self._lock_file(new_cache_fd)
+ new_cache = os.fdopen(new_cache_fd, 'r+')
+ json.dump(self.data, new_cache)
+ os.rename(new_cache_name, self.filename)
+ except (IOError, OSError, ResumeCacheConflict) as error:
+ print "SAVE ERROR: %s" % error
+ try:
+ os.unlink(new_cache_name)
+ except NameError: # mkstemp failed.
+ pass
+ else:
+ print "SAVE DONE!! %s" % self.filename
+ self.cache_file.close()
+ self.cache_file = new_cache
+
+ def file_uploaded(self, path):
+ if path in self.files.keys():
+ self.data['uploaded'][path] = self.files[path]
+ self._save()
+
def set_collection(self, uuid):
- self.data['col_uuid'] = uuid
+ self.data['col_locator'] = uuid
def collection(self):
- return self.data['col_uuid']
+ return self.data['col_locator']
def is_dirty(self, path):
- if (self.files[path]['mtime'] != self.data['files'][path]['mtime']) or
- (self.files[path]['size'] != self.data['files'][path]['size']):
+ if not path in self.data['uploaded'].keys():
+ # Cannot be dirty is it wasn't even uploaded
+ return False
+
+ if (self.files[path]['mtime'] != self.data['uploaded'][path]['mtime']) or (self.files[path]['size'] != self.data['uploaded'][path]['size']):
return True
else:
return False
-
def dirty_files(self):
"""
@@ -327,7 +360,7 @@ class ArvPutCollectionCache(object):
upload runs. These files should be re-uploaded.
"""
dirty = []
- for f in self.data['uploaded']:
+ for f in self.data['uploaded'].keys():
if self.is_dirty(f):
dirty.append(f)
return dirty
@@ -338,14 +371,14 @@ class ArvPutCollectionCache(object):
upload runs. These files should be checked for partial uploads
"""
uploaded = []
- for f in self.data['uploaded']:
+ for f in self.data['uploaded'].keys():
if not self.is_dirty(f):
uploaded.append(f)
return uploaded
def pending_files(self):
"""
- Files that should be uploaded, because they're dirty or thet
+ Files that should be uploaded, because of being dirty or that
never had the chance to be uploaded yet.
"""
pending = []
@@ -369,6 +402,16 @@ class ArvPutCollectionCache(object):
except IOError:
raise ResumeCacheConflict("{} locked".format(fileobj.name))
+ def close(self):
+ self.cache_file.close()
+
+ def destroy(self):
+ # try:
+ # os.unlink(self.filename)
+ # except OSError as error:
+ # if error.errno != errno.ENOENT: # That's what we wanted anyway.
+ # raise
+ self.close()
class ArvPutUploader(object):
def __init__(self, paths):
@@ -377,26 +420,23 @@ class ArvPutUploader(object):
self.collection = ArvPutCollection(locator=self.cache.collection(), cache=self.cache)
else:
self.collection = ArvPutCollection(cache=self.cache)
- self.cache.set_collection(self.collection.locator())
-
- # # Remove uploaded 'dirty' files from collection
- # for f in self.cache.dirty_files():
- # self.collection.remove_file(f)
- #
- # # Upload pending files
- # for f in self.cache.pending_files():
- # self.collection.write_file(f, os.path.join('.', f))
- # self.cache.file_uploaded(f) #TODO
-
+ self.cache.set_collection(self.collection.manifest_locator())
for p in paths:
if os.path.isdir(p):
self.collection.write_directory_tree(p)
elif os.path.isfile(p):
self.collection.write_file(p)
+ self.cache.destroy()
+
+ def manifest(self):
+ return self.collection.manifest()
+
+ def bytes_written(self):
+ return self.collection.bytes_written
class ArvPutCollection(object):
- def __init__(self, locator = None, cache=None, reporter=None,
+ def __init__(self, locator=None, cache=None, reporter=None,
bytes_expected=None, **kwargs):
self.collection_flush_time = 60
self.bytes_written = 0
@@ -410,23 +450,47 @@ class ArvPutCollection(object):
self.collection.save_new()
else:
self.collection = arvados.collection.Collection(locator)
+
+ def manifest_locator(self):
+ return self.collection.manifest_locator()
def write_file(self, source, filename):
- if source in self.cache.dirty_files():
- print "DIRTY: Removing file %s to be uploaded again" % source
+ if self.cache and source in self.cache.dirty_files():
+ print "DIRTY: Removing file %s from collection to be uploaded again" % source
self.collection.remove(filename)
- elif source in self.cache.uploaded_files():
- # TODO: Check for partial uploads
- pass
- # if not source in self.cache.pending_files():
- # print "WARNING: file %s already uploaded, skipping!" % source
- # return
-
- print "Writing file, source: %s, filename: %s" % (source, filename)
- with self.collection as c:
- with open(source, 'r') as source_fd:
- output = c.open(filename, 'w')
+ resume_offset = 0
+ resume_upload = False
+
+ print "FIND file %s" % filename
+ if self.collection.find(filename):
+ print "File %s already in the collection, checking!" % source
+ if os.path.getsize(source) == self.collection.find(filename).size():
+ print "WARNING: file %s already uploaded, skipping!" % source
+ # File already there, skip it.
+ return
+ elif os.path.getsize(source) > self.collection.find(filename).size():
+ print "WARNING: RESUMING file %s" % source
+ # File partially uploaded, resume!
+ resume_upload = True
+ resume_offset = self.collection.find(filename).size()
+ else:
+ # Source file smaller than uploaded file, what happened here?
+ # TODO: Raise exception of some kind?
+ pass
+
+ with open(source, 'r') as source_fd:
+ with self.collection as c:
+ if resume_upload:
+ print "Resuming file, source: %s, filename: %s" % (source, filename)
+ output = c.open(filename, 'a')
+ source_fd.seek(resume_offset)
+ first_block = False
+ else:
+ print "Writing file, source: %s, filename: %s" % (source, filename)
+ output = c.open(filename, 'w')
+ first_block = True
+
start_time = time.time()
while True:
data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
@@ -439,6 +503,11 @@ class ArvPutCollection(object):
if (time.time() - start_time) > self.collection_flush_time:
self.collection.save()
start_time = time.time()
+ # Once a block is written on each file, mark it as uploaded on the cache
+ if first_block:
+ if self.cache:
+ self.cache.file_uploaded(source)
+ first_block = False
# File write finished
output.close()
self.collection.save() # One last save...
@@ -456,7 +525,7 @@ class ArvPutCollection(object):
def manifest(self):
print "BLOCK SIZE: %d" % arvados.config.KEEP_BLOCK_SIZE
- print "MANIFEST Locator:\n%s\nMANIFEST TEXT:\n%s" % (self.collection.manifest_locator(), self.collection.manifest_text())
+ print "MANIFEST Locator:\n%s\nMANIFEST TEXT:\n%s" % (self.manifest_locator(), self.collection.manifest_text())
return True
def report_progress(self):
diff --git a/sdk/python/tests/test_arv_put.py b/sdk/python/tests/test_arv_put.py
index 54a70bf..c4ce823 100755
--- a/sdk/python/tests/test_arv_put.py
+++ b/sdk/python/tests/test_arv_put.py
@@ -237,21 +237,37 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
class ArvadosPutCollectionTest(run_test_server.TestCaseWithServers):
MAIN_SERVER = {}
KEEP_SERVER = {}
+ import shutil
- def test_write_files(self):
- c = arv_put.ArvPutCollection()
- data = 'a' * 1024 * 1024 # 1 MB
- tmpdir = tempfile.mkdtemp()
- for size in [1, 10, 64, 128]:
- with open(os.path.join(tmpdir, 'file_%d' % size), 'w') as f:
- for _ in range(size):
- f.write(data)
- c.write_file(f.name, os.path.basename(f.name))
- os.unlink(f.name)
- self.assertEqual(True, c.manifest())
-
- def test_write_directory(self):
- c = arv_put.ArvPutCollection()
+ # def test_write_files(self):
+ # c = arv_put.ArvPutCollection()
+ # data = 'a' * 1024 * 1024 # 1 MB
+ # tmpdir = tempfile.mkdtemp()
+ # for size in [1, 10, 64, 128]:
+ # with open(os.path.join(tmpdir, 'file_%d' % size), 'w') as f:
+ # for _ in range(size):
+ # f.write(data)
+ # c.write_file(f.name, os.path.basename(f.name))
+ # shutil.rmtree(tmpdir)
+ # self.assertEqual(True, c.manifest())
+ #
+ # def test_write_directory(self):
+ # data = 'b' * 1024 * 1024
+ # tmpdir = tempfile.mkdtemp()
+ # for size in [1, 5, 10, 70]:
+ # with open(os.path.join(tmpdir, 'file_%d' % size), 'w') as f:
+ # for _ in range(size):
+ # f.write(data)
+ # os.mkdir(os.path.join(tmpdir, 'subdir1'))
+ # for size in [2, 4, 6]:
+ # with open(os.path.join(tmpdir, 'subdir1', 'file_%d' % size), 'w') as f:
+ # for _ in range(size):
+ # f.write(data)
+ # c = arv_put.ArvPutUploader([tmpdir])
+ # shutil.rmtree(tmpdir)
+ # self.assertEqual(True, c.manifest())
+
+ def test_write_directory_twice(self):
data = 'b' * 1024 * 1024
tmpdir = tempfile.mkdtemp()
for size in [1, 5, 10, 70]:
@@ -263,8 +279,11 @@ class ArvadosPutCollectionTest(run_test_server.TestCaseWithServers):
with open(os.path.join(tmpdir, 'subdir1', 'file_%d' % size), 'w') as f:
for _ in range(size):
f.write(data)
- c.write_directory_tree(tmpdir, os.path.join('.', os.path.basename(tmpdir)))
- self.assertEqual(True, c.manifest())
+ c = arv_put.ArvPutUploader([tmpdir])
+ d = arv_put.ArvPutUploader([tmpdir])
+ print "ESCRIDIERON: c: %d, d: %d" % (c.bytes_written(), d.bytes_written())
+ shutil.rmtree(tmpdir)
+ self.assertEqual(0, d.bytes_written())
class ArvadosPutCollectionWriterTest(run_test_server.TestCaseWithServers,
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list