[ARVADOS] created: 44ea4c89930d52d142da2c0b7204fd34f61f8f39

Git user git at public.curoverse.com
Mon Jul 4 19:09:08 EDT 2016

        at  44ea4c89930d52d142da2c0b7204fd34f61f8f39 (commit)

commit 44ea4c89930d52d142da2c0b7204fd34f61f8f39
Author: Lucas Di Pentima <lucas at curoverse.com>
Date:   Mon Jul 4 20:07:56 2016 -0300

    9463: Resume cache implementation (WIP)

diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index 906a6bf..dffa576 100755
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -277,19 +277,153 @@ class ResumeCache(object):
+class ArvPutCollectionCache(object):
+    CACHE_DIR = '.cache/arvados/arv-put'
+    def __init__(self, paths):
+        md5 = hashlib.md5()
+        md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
+        realpaths = sorted(os.path.realpath(path) for path in paths)
+        self.files = {}
+        for path in realpaths:
+            self._get_file_data(path)
+        # Only hash args paths
+        self.cache_hash = md5.update('\0'.join(realpaths))
+        self.cache_file = open(os.path.join(self.CACHE_DIR, self.cache_hash), 'a+')
+        self._lock_file(self.cache_file)
+        self.filename = self.cache_file.name
+        self.data = self._load()
+    def _load(self):
+        try:
+            self.cache_file.seek(0)
+            ret = json.load(self.cache_file)
+        except ValueError:
+            # File empty, set up new cache
+            ret = {
+                'col_uuid' : None, # Collection UUID
+                'uploaded' : [], # Uploaded file
+                'files': {}, # Complete cached file (path, mtime, size) list
+            }
+        return ret
+    def set_collection(self, uuid):
+        self.data['col_uuid'] = uuid
+    def collection(self):
+        return self.data['col_uuid']
+    def is_dirty(self, path):
+        if (self.files[path]['mtime'] != self.data['files'][path]['mtime']) or
+           (self.files[path]['size'] != self.data['files'][path]['size']):
+            return True
+        else:
+            return False
+    def dirty_files(self):
+        """
+        Files that were previously uploaded but changed locally between 
+        upload runs. These files should be re-uploaded.
+        """
+        dirty = []
+        for f in self.data['uploaded']:
+            if self.is_dirty(f):
+                dirty.append(f)
+        return dirty
+    def uploaded_files(self):
+        """
+        Files that were uploaded and have not changed locally between 
+        upload runs. These files should be checked for partial uploads
+        """
+        uploaded = []
+        for f in self.data['uploaded']:
+            if not self.is_dirty(f):
+                uploaded.append(f)
+        return uploaded
+    def pending_files(self):
+        """
+        Files that should be uploaded, because they're dirty or thet
+        never had the chance to be uploaded yet.
+        """
+        pending = []
+        uploaded = self.uploaded_files()
+        for f in self.files.keys():
+            if f not in uploaded:
+                pending.append(f)
+        return pending
+    def _get_file_data(self, path):
+        if os.path.isfile(path):
+            self.files[path] = {'mtime': os.path.getmtime(path),
+                                'size': os.path.getsize(path)}
+        elif os.path.isdir(path):
+            for item in os.listdir(path):
+                self._get_file_data(os.path.join(path, item))
+    def _lock_file(self, fileobj):
+        try:
+            fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
+        except IOError:
+            raise ResumeCacheConflict("{} locked".format(fileobj.name))
+class ArvPutUploader(object):
+    def __init__(self, paths):
+        self.cache = ArvPutCollectionCache(paths)
+        if self.cache.collection() is not None:
+            self.collection = ArvPutCollection(locator=self.cache.collection(), cache=self.cache)
+        else:
+            self.collection = ArvPutCollection(cache=self.cache)
+            self.cache.set_collection(self.collection.locator())
+        # # Remove uploaded 'dirty' files from collection
+        # for f in self.cache.dirty_files():
+        #     self.collection.remove_file(f)
+        #
+        # # Upload pending files
+        # for f in self.cache.pending_files():
+        #     self.collection.write_file(f, os.path.join('.', f))
+        #     self.cache.file_uploaded(f) #TODO
+        for p in paths:
+            if os.path.isdir(p):
+                self.collection.write_directory_tree(p)
+            elif os.path.isfile(p):
+                self.collection.write_file(p)
 class ArvPutCollection(object):
-    def __init__(self, cache=None, reporter=None, bytes_expected=None, **kwargs):
+    def __init__(self, locator = None, cache=None, reporter=None, 
+                    bytes_expected=None, **kwargs):
         self.collection_flush_time = 60
         self.bytes_written = 0
         self._seen_inputs = []
         self.cache = cache
         self.reporter = reporter
         self.bytes_expected = bytes_expected
-        # super(ArvPutCollection, self).__init__(**kwargs)
-        self.collection = arvados.collection.Collection()
-        self.collection.save_new()
+        if locator is None:
+            self.collection = arvados.collection.Collection()
+            self.collection.save_new()
+        else:
+            self.collection = arvados.collection.Collection(locator)
     def write_file(self, source, filename):
+        if source in self.cache.dirty_files():
+            print "DIRTY: Removing file %s to be uploaded again" % source
+            self.collection.remove(filename)
+        elif source in self.cache.uploaded_files():
+            # TODO: Check for partial uploads
+            pass
+        # if not source in self.cache.pending_files():
+        #     print "WARNING: file %s already uploaded, skipping!" % source
+        #     return
+        print "Writing file, source: %s, filename: %s" % (source, filename)
         with self.collection as c:
             with open(source, 'r') as source_fd:
                 output = c.open(filename, 'w')
@@ -299,21 +433,23 @@ class ArvPutCollection(object):
                     if not data:
-                    output.flush()
+                    output.flush() # Commit block to Keep
+                    self.bytes_written += len(data)
                     # Is it time to update the collection?
                     if (time.time() - start_time) > self.collection_flush_time:
                         start_time = time.time()
                 # File write finished
-                self.collection.save() # TODO: Is this necessary?
+                self.collection.save() # One last save...
     def write_directory_tree(self, path, stream_name='.', max_manifest_depth=-1):
         if os.path.isdir(path):
             for item in os.listdir(path):
-                if os.path.isdir(item):
+                print "Checking path: '%s' - stream_name: '%s'" % (path, stream_name)
+                if os.path.isdir(os.path.join(path, item)):
                     self.write_directory_tree(os.path.join(path, item), 
-                                    os.path.join(stream_name, path, item))
+                                    os.path.join(stream_name, item))
                     self.write_file(os.path.join(path, item), 
                                     os.path.join(stream_name, item))

commit ef06ae1ddabb8ab6977090deb756baa2e4fb6eb5
Author: Lucas Di Pentima <lucas at curoverse.com>
Date:   Fri Jul 1 19:05:18 2016 -0300

    9463: Initial coding with tests

diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
old mode 100644
new mode 100755
index 5cb699f..906a6bf
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -14,6 +14,7 @@ import hashlib
 import json
 import os
 import pwd
+import time
 import signal
 import socket
 import sys
@@ -276,6 +277,57 @@ class ResumeCache(object):
+class ArvPutCollection(object):
+    def __init__(self, cache=None, reporter=None, bytes_expected=None, **kwargs):
+        self.collection_flush_time = 60
+        self.bytes_written = 0
+        self._seen_inputs = []
+        self.cache = cache
+        self.reporter = reporter
+        self.bytes_expected = bytes_expected
+        # super(ArvPutCollection, self).__init__(**kwargs)
+        self.collection = arvados.collection.Collection()
+        self.collection.save_new()
+    def write_file(self, source, filename):
+        with self.collection as c:
+            with open(source, 'r') as source_fd:
+                output = c.open(filename, 'w')
+                start_time = time.time()
+                while True:
+                    data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
+                    if not data:
+                        break
+                    output.write(data)
+                    output.flush()
+                    # Is it time to update the collection?
+                    if (time.time() - start_time) > self.collection_flush_time:
+                        self.collection.save()
+                        start_time = time.time()
+                # File write finished
+                output.close()
+                self.collection.save() # TODO: Is this necessary?
+    def write_directory_tree(self, path, stream_name='.', max_manifest_depth=-1):
+        if os.path.isdir(path):
+            for item in os.listdir(path):
+                if os.path.isdir(item):
+                    self.write_directory_tree(os.path.join(path, item), 
+                                    os.path.join(stream_name, path, item))
+                else:
+                    self.write_file(os.path.join(path, item), 
+                                    os.path.join(stream_name, item))
+    def manifest(self):
+        print "BLOCK SIZE: %d" % arvados.config.KEEP_BLOCK_SIZE
+        print "MANIFEST Locator:\n%s\nMANIFEST TEXT:\n%s" % (self.collection.manifest_locator(), self.collection.manifest_text())
+        return True
+    def report_progress(self):
+        if self.reporter is not None:
+            self.reporter(self.bytes_written, self.bytes_expected)
 class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
     STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS +
                    ['bytes_written', '_seen_inputs'])
diff --git a/sdk/python/tests/test_arv_put.py b/sdk/python/tests/test_arv_put.py
old mode 100644
new mode 100755
index e64d914..54a70bf
--- a/sdk/python/tests/test_arv_put.py
+++ b/sdk/python/tests/test_arv_put.py
@@ -234,6 +234,39 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
                           arv_put.ResumeCache, path)
+class ArvadosPutCollectionTest(run_test_server.TestCaseWithServers):
+    MAIN_SERVER = {}
+    KEEP_SERVER = {}
+    def test_write_files(self):
+        c = arv_put.ArvPutCollection()
+        data = 'a' * 1024 * 1024 # 1 MB
+        tmpdir = tempfile.mkdtemp()
+        for size in [1, 10, 64, 128]:
+            with open(os.path.join(tmpdir, 'file_%d' % size), 'w') as f:
+                for _ in range(size):
+                    f.write(data)
+            c.write_file(f.name, os.path.basename(f.name))
+            os.unlink(f.name)
+        self.assertEqual(True, c.manifest())
+    def test_write_directory(self):
+        c = arv_put.ArvPutCollection()
+        data = 'b' * 1024 * 1024
+        tmpdir = tempfile.mkdtemp()
+        for size in [1, 5, 10, 70]:
+            with open(os.path.join(tmpdir, 'file_%d' % size), 'w') as f:
+                for _ in range(size):
+                    f.write(data)
+        os.mkdir(os.path.join(tmpdir, 'subdir1'))
+        for size in [2, 4, 6]:
+            with open(os.path.join(tmpdir, 'subdir1', 'file_%d' % size), 'w') as f:
+                for _ in range(size):
+                    f.write(data)
+        c.write_directory_tree(tmpdir, os.path.join('.', os.path.basename(tmpdir)))
+        self.assertEqual(True, c.manifest())
 class ArvadosPutCollectionWriterTest(run_test_server.TestCaseWithServers,
     def setUp(self):



More information about the arvados-commits mailing list