[ARVADOS] updated: 3847357436978a97470e0769f3200d354a2bd08e

Git user git at public.curoverse.com
Tue Jul 5 17:53:53 EDT 2016

Summary of changes:
 sdk/python/arvados/commands/put.py | 151 +++++++++++++++++++++++++++----------
 sdk/python/tests/test_arv_put.py   |  51 +++++++++----
 2 files changed, 145 insertions(+), 57 deletions(-)

       via  3847357436978a97470e0769f3200d354a2bd08e (commit)
      from  44ea4c89930d52d142da2c0b7204fd34f61f8f39 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.

commit 3847357436978a97470e0769f3200d354a2bd08e
Author: Lucas Di Pentima <lucas at curoverse.com>
Date:   Tue Jul 5 18:53:37 2016 -0300

    9463: Lots of progress today, resume upload code written, many tests to do!

diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index dffa576..6e216ee 100755
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -278,8 +278,6 @@ class ResumeCache(object):
 class ArvPutCollectionCache(object):
-    CACHE_DIR = '.cache/arvados/arv-put'
     def __init__(self, paths):
         md5 = hashlib.md5()
         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
@@ -288,8 +286,12 @@ class ArvPutCollectionCache(object):
         for path in realpaths:
         # Only hash args paths
-        self.cache_hash = md5.update('\0'.join(realpaths))
-        self.cache_file = open(os.path.join(self.CACHE_DIR, self.cache_hash), 'a+')
+        md5.update('\0'.join(realpaths))
+        self.cache_hash = md5.hexdigest()
+        self.cache_file = open(os.path.join(
+            arv_cmd.make_home_conf_dir('.cache/arvados/arv-put', 0o700, 'raise'), 
+            self.cache_hash), 'a+')
         self.filename = self.cache_file.name
         self.data = self._load()
@@ -301,25 +303,56 @@ class ArvPutCollectionCache(object):
         except ValueError:
             # File empty, set up new cache
             ret = {
-                'col_uuid' : None, # Collection UUID
-                'uploaded' : [], # Uploaded file
-                'files': {}, # Complete cached file (path, mtime, size) list
+                'col_locator' : None, # Collection 
+                'uploaded' : {}, # Uploaded file list: {path : {size, mtime}}
         return ret
+    def _save(self):
+        """
+        Atomically save (create temp file & rename() it)
+        """
+        # TODO: Should be a good idea to avoid _save() spamming? when writing 
+        # lots of small files.
+        print "SAVE START"
+        try:
+            new_cache_fd, new_cache_name = tempfile.mkstemp(
+                dir=os.path.dirname(self.filename))
+            self._lock_file(new_cache_fd)
+            new_cache = os.fdopen(new_cache_fd, 'r+')
+            json.dump(self.data, new_cache)
+            os.rename(new_cache_name, self.filename)
+        except (IOError, OSError, ResumeCacheConflict) as error:
+            print "SAVE ERROR: %s" % error
+            try:
+                os.unlink(new_cache_name)
+            except NameError:  # mkstemp failed.
+                pass
+        else:
+            print "SAVE DONE!! %s" % self.filename
+            self.cache_file.close()
+            self.cache_file = new_cache
+    def file_uploaded(self, path):
+        if path in self.files.keys():
+            self.data['uploaded'][path] = self.files[path]
+            self._save()
     def set_collection(self, uuid):
-        self.data['col_uuid'] = uuid
+        self.data['col_locator'] = uuid
     def collection(self):
-        return self.data['col_uuid']
+        return self.data['col_locator']
     def is_dirty(self, path):
-        if (self.files[path]['mtime'] != self.data['files'][path]['mtime']) or
-           (self.files[path]['size'] != self.data['files'][path]['size']):
+        if not path in self.data['uploaded'].keys():
+            # Cannot be dirty is it wasn't even uploaded
+            return False
+        if (self.files[path]['mtime'] != self.data['uploaded'][path]['mtime']) or (self.files[path]['size'] != self.data['uploaded'][path]['size']):
             return True
             return False
     def dirty_files(self):
@@ -327,7 +360,7 @@ class ArvPutCollectionCache(object):
         upload runs. These files should be re-uploaded.
         dirty = []
-        for f in self.data['uploaded']:
+        for f in self.data['uploaded'].keys():
             if self.is_dirty(f):
         return dirty
@@ -338,14 +371,14 @@ class ArvPutCollectionCache(object):
         upload runs. These files should be checked for partial uploads
         uploaded = []
-        for f in self.data['uploaded']:
+        for f in self.data['uploaded'].keys():
             if not self.is_dirty(f):
         return uploaded
     def pending_files(self):
-        Files that should be uploaded, because they're dirty or thet
+        Files that should be uploaded, because of being dirty or that
         never had the chance to be uploaded yet.
         pending = []
@@ -369,6 +402,16 @@ class ArvPutCollectionCache(object):
         except IOError:
             raise ResumeCacheConflict("{} locked".format(fileobj.name))
+    def close(self):
+        self.cache_file.close()
+    def destroy(self):
+        # try:
+        #     os.unlink(self.filename)
+        # except OSError as error:
+        #     if error.errno != errno.ENOENT:  # That's what we wanted anyway.
+        #         raise
+        self.close()
 class ArvPutUploader(object):
     def __init__(self, paths):
@@ -377,26 +420,23 @@ class ArvPutUploader(object):
             self.collection = ArvPutCollection(locator=self.cache.collection(), cache=self.cache)
             self.collection = ArvPutCollection(cache=self.cache)
-            self.cache.set_collection(self.collection.locator())
-        # # Remove uploaded 'dirty' files from collection
-        # for f in self.cache.dirty_files():
-        #     self.collection.remove_file(f)
-        #
-        # # Upload pending files
-        # for f in self.cache.pending_files():
-        #     self.collection.write_file(f, os.path.join('.', f))
-        #     self.cache.file_uploaded(f) #TODO
+            self.cache.set_collection(self.collection.manifest_locator())
         for p in paths:
             if os.path.isdir(p):
             elif os.path.isfile(p):
+        self.cache.destroy()
+    def manifest(self):
+        return self.collection.manifest()
+    def bytes_written(self):
+        return self.collection.bytes_written
 class ArvPutCollection(object):
-    def __init__(self, locator = None, cache=None, reporter=None, 
+    def __init__(self, locator=None, cache=None, reporter=None, 
                     bytes_expected=None, **kwargs):
         self.collection_flush_time = 60
         self.bytes_written = 0
@@ -410,23 +450,47 @@ class ArvPutCollection(object):
             self.collection = arvados.collection.Collection(locator)
+    def manifest_locator(self):
+        return self.collection.manifest_locator()
     def write_file(self, source, filename):
-        if source in self.cache.dirty_files():
-            print "DIRTY: Removing file %s to be uploaded again" % source
+        if self.cache and source in self.cache.dirty_files():
+            print "DIRTY: Removing file %s from collection to be uploaded again" % source
-        elif source in self.cache.uploaded_files():
-            # TODO: Check for partial uploads
-            pass
-        # if not source in self.cache.pending_files():
-        #     print "WARNING: file %s already uploaded, skipping!" % source
-        #     return
-        print "Writing file, source: %s, filename: %s" % (source, filename)
-        with self.collection as c:
-            with open(source, 'r') as source_fd:
-                output = c.open(filename, 'w')
+        resume_offset = 0
+        resume_upload = False
+        print "FIND file %s" % filename
+        if self.collection.find(filename):
+            print "File %s already in the collection, checking!" % source
+            if os.path.getsize(source) == self.collection.find(filename).size():
+                print "WARNING: file %s already uploaded, skipping!" % source
+                # File already there, skip it.
+                return
+            elif os.path.getsize(source) > self.collection.find(filename).size():
+                print "WARNING: RESUMING file %s" % source
+                # File partially uploaded, resume!
+                resume_upload = True
+                resume_offset = self.collection.find(filename).size()
+            else:
+                # Source file smaller than uploaded file, what happened here?
+                # TODO: Raise exception of some kind?
+                pass
+        with open(source, 'r') as source_fd:
+            with self.collection as c:
+                if resume_upload:
+                    print "Resuming file, source: %s, filename: %s" % (source, filename)
+                    output = c.open(filename, 'a')
+                    source_fd.seek(resume_offset)
+                    first_block = False
+                else:
+                    print "Writing file, source: %s, filename: %s" % (source, filename)
+                    output = c.open(filename, 'w')
+                    first_block = True
                 start_time = time.time()
                 while True:
                     data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
@@ -439,6 +503,11 @@ class ArvPutCollection(object):
                     if (time.time() - start_time) > self.collection_flush_time:
                         start_time = time.time()
+                    # Once a block is written on each file, mark it as uploaded on the cache
+                    if first_block:
+                        if self.cache:
+                            self.cache.file_uploaded(source)
+                        first_block = False
                 # File write finished
                 self.collection.save() # One last save...
@@ -456,7 +525,7 @@ class ArvPutCollection(object):
     def manifest(self):
         print "BLOCK SIZE: %d" % arvados.config.KEEP_BLOCK_SIZE
-        print "MANIFEST Locator:\n%s\nMANIFEST TEXT:\n%s" % (self.collection.manifest_locator(), self.collection.manifest_text())
+        print "MANIFEST Locator:\n%s\nMANIFEST TEXT:\n%s" % (self.manifest_locator(), self.collection.manifest_text())
         return True
     def report_progress(self):
diff --git a/sdk/python/tests/test_arv_put.py b/sdk/python/tests/test_arv_put.py
index 54a70bf..c4ce823 100755
--- a/sdk/python/tests/test_arv_put.py
+++ b/sdk/python/tests/test_arv_put.py
@@ -237,21 +237,37 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
 class ArvadosPutCollectionTest(run_test_server.TestCaseWithServers):
     MAIN_SERVER = {}
     KEEP_SERVER = {}
+    import shutil
-    def test_write_files(self):
-        c = arv_put.ArvPutCollection()
-        data = 'a' * 1024 * 1024 # 1 MB
-        tmpdir = tempfile.mkdtemp()
-        for size in [1, 10, 64, 128]:
-            with open(os.path.join(tmpdir, 'file_%d' % size), 'w') as f:
-                for _ in range(size):
-                    f.write(data)
-            c.write_file(f.name, os.path.basename(f.name))
-            os.unlink(f.name)
-        self.assertEqual(True, c.manifest())
-    def test_write_directory(self):
-        c = arv_put.ArvPutCollection()
+    # def test_write_files(self):
+    #     c = arv_put.ArvPutCollection()
+    #     data = 'a' * 1024 * 1024 # 1 MB
+    #     tmpdir = tempfile.mkdtemp()
+    #     for size in [1, 10, 64, 128]:
+    #         with open(os.path.join(tmpdir, 'file_%d' % size), 'w') as f:
+    #             for _ in range(size):
+    #                 f.write(data)
+    #         c.write_file(f.name, os.path.basename(f.name))
+    #     shutil.rmtree(tmpdir)
+    #     self.assertEqual(True, c.manifest())
+    #
+    # def test_write_directory(self):
+    #     data = 'b' * 1024 * 1024
+    #     tmpdir = tempfile.mkdtemp()
+    #     for size in [1, 5, 10, 70]:
+    #         with open(os.path.join(tmpdir, 'file_%d' % size), 'w') as f:
+    #             for _ in range(size):
+    #                 f.write(data)
+    #     os.mkdir(os.path.join(tmpdir, 'subdir1'))
+    #     for size in [2, 4, 6]:
+    #         with open(os.path.join(tmpdir, 'subdir1', 'file_%d' % size), 'w') as f:
+    #             for _ in range(size):
+    #                 f.write(data)
+    #     c = arv_put.ArvPutUploader([tmpdir])
+    #     shutil.rmtree(tmpdir)
+    #     self.assertEqual(True, c.manifest())
+    def test_write_directory_twice(self):
         data = 'b' * 1024 * 1024
         tmpdir = tempfile.mkdtemp()
         for size in [1, 5, 10, 70]:
@@ -263,8 +279,11 @@ class ArvadosPutCollectionTest(run_test_server.TestCaseWithServers):
             with open(os.path.join(tmpdir, 'subdir1', 'file_%d' % size), 'w') as f:
                 for _ in range(size):
-        c.write_directory_tree(tmpdir, os.path.join('.', os.path.basename(tmpdir)))
-        self.assertEqual(True, c.manifest())
+        c = arv_put.ArvPutUploader([tmpdir])
+        d = arv_put.ArvPutUploader([tmpdir])
+        print "ESCRIDIERON: c: %d, d: %d" % (c.bytes_written(), d.bytes_written())
+        shutil.rmtree(tmpdir)
+        self.assertEqual(0, d.bytes_written())
 class ArvadosPutCollectionWriterTest(run_test_server.TestCaseWithServers,



More information about the arvados-commits mailing list