[ARVADOS] created: 44ea4c89930d52d142da2c0b7204fd34f61f8f39
Git user
git at public.curoverse.com
Mon Jul 4 19:09:08 EDT 2016
at 44ea4c89930d52d142da2c0b7204fd34f61f8f39 (commit)
commit 44ea4c89930d52d142da2c0b7204fd34f61f8f39
Author: Lucas Di Pentima <lucas at curoverse.com>
Date: Mon Jul 4 20:07:56 2016 -0300
9463: Resume cache implementation (WIP)
diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index 906a6bf..dffa576 100755
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -277,19 +277,153 @@ class ResumeCache(object):
self.__init__(self.filename)
+class ArvPutCollectionCache(object):
+ CACHE_DIR = '.cache/arvados/arv-put'
+
+ def __init__(self, paths):
+ md5 = hashlib.md5()
+ md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
+ realpaths = sorted(os.path.realpath(path) for path in paths)
+ self.files = {}
+ for path in realpaths:
+ self._get_file_data(path)
+ # Only hash args paths
+ self.cache_hash = md5.update('\0'.join(realpaths))
+ self.cache_file = open(os.path.join(self.CACHE_DIR, self.cache_hash), 'a+')
+ self._lock_file(self.cache_file)
+ self.filename = self.cache_file.name
+ self.data = self._load()
+
+ def _load(self):
+ try:
+ self.cache_file.seek(0)
+ ret = json.load(self.cache_file)
+ except ValueError:
+ # File empty, set up new cache
+ ret = {
+ 'col_uuid' : None, # Collection UUID
+ 'uploaded' : [], # Uploaded file
+ 'files': {}, # Complete cached file (path, mtime, size) list
+ }
+ return ret
+
+ def set_collection(self, uuid):
+ self.data['col_uuid'] = uuid
+
+ def collection(self):
+ return self.data['col_uuid']
+
+ def is_dirty(self, path):
+ if (self.files[path]['mtime'] != self.data['files'][path]['mtime']) or
+ (self.files[path]['size'] != self.data['files'][path]['size']):
+ return True
+ else:
+ return False
+
+
+ def dirty_files(self):
+ """
+ Files that were previously uploaded but changed locally between
+ upload runs. These files should be re-uploaded.
+ """
+ dirty = []
+ for f in self.data['uploaded']:
+ if self.is_dirty(f):
+ dirty.append(f)
+ return dirty
+
+ def uploaded_files(self):
+ """
+ Files that were uploaded and have not changed locally between
+ upload runs. These files should be checked for partial uploads
+ """
+ uploaded = []
+ for f in self.data['uploaded']:
+ if not self.is_dirty(f):
+ uploaded.append(f)
+ return uploaded
+
+ def pending_files(self):
+ """
+ Files that should be uploaded, because they're dirty or thet
+ never had the chance to be uploaded yet.
+ """
+ pending = []
+ uploaded = self.uploaded_files()
+ for f in self.files.keys():
+ if f not in uploaded:
+ pending.append(f)
+ return pending
+
+ def _get_file_data(self, path):
+ if os.path.isfile(path):
+ self.files[path] = {'mtime': os.path.getmtime(path),
+ 'size': os.path.getsize(path)}
+ elif os.path.isdir(path):
+ for item in os.listdir(path):
+ self._get_file_data(os.path.join(path, item))
+
+ def _lock_file(self, fileobj):
+ try:
+ fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
+ except IOError:
+ raise ResumeCacheConflict("{} locked".format(fileobj.name))
+
+
+class ArvPutUploader(object):
+ def __init__(self, paths):
+ self.cache = ArvPutCollectionCache(paths)
+ if self.cache.collection() is not None:
+ self.collection = ArvPutCollection(locator=self.cache.collection(), cache=self.cache)
+ else:
+ self.collection = ArvPutCollection(cache=self.cache)
+ self.cache.set_collection(self.collection.locator())
+
+ # # Remove uploaded 'dirty' files from collection
+ # for f in self.cache.dirty_files():
+ # self.collection.remove_file(f)
+ #
+ # # Upload pending files
+ # for f in self.cache.pending_files():
+ # self.collection.write_file(f, os.path.join('.', f))
+ # self.cache.file_uploaded(f) #TODO
+
+ for p in paths:
+ if os.path.isdir(p):
+ self.collection.write_directory_tree(p)
+ elif os.path.isfile(p):
+ self.collection.write_file(p)
+
+
class ArvPutCollection(object):
- def __init__(self, cache=None, reporter=None, bytes_expected=None, **kwargs):
+ def __init__(self, locator = None, cache=None, reporter=None,
+ bytes_expected=None, **kwargs):
self.collection_flush_time = 60
self.bytes_written = 0
self._seen_inputs = []
self.cache = cache
self.reporter = reporter
self.bytes_expected = bytes_expected
- # super(ArvPutCollection, self).__init__(**kwargs)
- self.collection = arvados.collection.Collection()
- self.collection.save_new()
-
+
+ if locator is None:
+ self.collection = arvados.collection.Collection()
+ self.collection.save_new()
+ else:
+ self.collection = arvados.collection.Collection(locator)
+
def write_file(self, source, filename):
+ if source in self.cache.dirty_files():
+ print "DIRTY: Removing file %s to be uploaded again" % source
+ self.collection.remove(filename)
+ elif source in self.cache.uploaded_files():
+ # TODO: Check for partial uploads
+ pass
+
+ # if not source in self.cache.pending_files():
+ # print "WARNING: file %s already uploaded, skipping!" % source
+ # return
+
+ print "Writing file, source: %s, filename: %s" % (source, filename)
with self.collection as c:
with open(source, 'r') as source_fd:
output = c.open(filename, 'w')
@@ -299,21 +433,23 @@ class ArvPutCollection(object):
if not data:
break
output.write(data)
- output.flush()
+ output.flush() # Commit block to Keep
+ self.bytes_written += len(data)
# Is it time to update the collection?
if (time.time() - start_time) > self.collection_flush_time:
self.collection.save()
start_time = time.time()
# File write finished
output.close()
- self.collection.save() # TODO: Is this necessary?
+ self.collection.save() # One last save...
def write_directory_tree(self, path, stream_name='.', max_manifest_depth=-1):
if os.path.isdir(path):
for item in os.listdir(path):
- if os.path.isdir(item):
+ print "Checking path: '%s' - stream_name: '%s'" % (path, stream_name)
+ if os.path.isdir(os.path.join(path, item)):
self.write_directory_tree(os.path.join(path, item),
- os.path.join(stream_name, path, item))
+ os.path.join(stream_name, item))
else:
self.write_file(os.path.join(path, item),
os.path.join(stream_name, item))
commit ef06ae1ddabb8ab6977090deb756baa2e4fb6eb5
Author: Lucas Di Pentima <lucas at curoverse.com>
Date: Fri Jul 1 19:05:18 2016 -0300
9463: Initial coding with tests
diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
old mode 100644
new mode 100755
index 5cb699f..906a6bf
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -14,6 +14,7 @@ import hashlib
import json
import os
import pwd
+import time
import signal
import socket
import sys
@@ -276,6 +277,57 @@ class ResumeCache(object):
self.__init__(self.filename)
+class ArvPutCollection(object):
+ def __init__(self, cache=None, reporter=None, bytes_expected=None, **kwargs):
+ self.collection_flush_time = 60
+ self.bytes_written = 0
+ self._seen_inputs = []
+ self.cache = cache
+ self.reporter = reporter
+ self.bytes_expected = bytes_expected
+ # super(ArvPutCollection, self).__init__(**kwargs)
+ self.collection = arvados.collection.Collection()
+ self.collection.save_new()
+
+ def write_file(self, source, filename):
+ with self.collection as c:
+ with open(source, 'r') as source_fd:
+ output = c.open(filename, 'w')
+ start_time = time.time()
+ while True:
+ data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
+ if not data:
+ break
+ output.write(data)
+ output.flush()
+ # Is it time to update the collection?
+ if (time.time() - start_time) > self.collection_flush_time:
+ self.collection.save()
+ start_time = time.time()
+ # File write finished
+ output.close()
+ self.collection.save() # TODO: Is this necessary?
+
+ def write_directory_tree(self, path, stream_name='.', max_manifest_depth=-1):
+ if os.path.isdir(path):
+ for item in os.listdir(path):
+ if os.path.isdir(item):
+ self.write_directory_tree(os.path.join(path, item),
+ os.path.join(stream_name, path, item))
+ else:
+ self.write_file(os.path.join(path, item),
+ os.path.join(stream_name, item))
+
+ def manifest(self):
+ print "BLOCK SIZE: %d" % arvados.config.KEEP_BLOCK_SIZE
+ print "MANIFEST Locator:\n%s\nMANIFEST TEXT:\n%s" % (self.collection.manifest_locator(), self.collection.manifest_text())
+ return True
+
+ def report_progress(self):
+ if self.reporter is not None:
+ self.reporter(self.bytes_written, self.bytes_expected)
+
+
class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS +
['bytes_written', '_seen_inputs'])
diff --git a/sdk/python/tests/test_arv_put.py b/sdk/python/tests/test_arv_put.py
old mode 100644
new mode 100755
index e64d914..54a70bf
--- a/sdk/python/tests/test_arv_put.py
+++ b/sdk/python/tests/test_arv_put.py
@@ -234,6 +234,39 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
arv_put.ResumeCache, path)
+class ArvadosPutCollectionTest(run_test_server.TestCaseWithServers):
+ MAIN_SERVER = {}
+ KEEP_SERVER = {}
+
+ def test_write_files(self):
+ c = arv_put.ArvPutCollection()
+ data = 'a' * 1024 * 1024 # 1 MB
+ tmpdir = tempfile.mkdtemp()
+ for size in [1, 10, 64, 128]:
+ with open(os.path.join(tmpdir, 'file_%d' % size), 'w') as f:
+ for _ in range(size):
+ f.write(data)
+ c.write_file(f.name, os.path.basename(f.name))
+ os.unlink(f.name)
+ self.assertEqual(True, c.manifest())
+
+ def test_write_directory(self):
+ c = arv_put.ArvPutCollection()
+ data = 'b' * 1024 * 1024
+ tmpdir = tempfile.mkdtemp()
+ for size in [1, 5, 10, 70]:
+ with open(os.path.join(tmpdir, 'file_%d' % size), 'w') as f:
+ for _ in range(size):
+ f.write(data)
+ os.mkdir(os.path.join(tmpdir, 'subdir1'))
+ for size in [2, 4, 6]:
+ with open(os.path.join(tmpdir, 'subdir1', 'file_%d' % size), 'w') as f:
+ for _ in range(size):
+ f.write(data)
+ c.write_directory_tree(tmpdir, os.path.join('.', os.path.basename(tmpdir)))
+ self.assertEqual(True, c.manifest())
+
+
class ArvadosPutCollectionWriterTest(run_test_server.TestCaseWithServers,
ArvadosBaseTestCase):
def setUp(self):
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list