[ARVADOS] updated: 1487c0577921c88801c377d753d1322fd1485968

Git user git at public.curoverse.com
Fri Jul 22 13:52:06 EDT 2016


Summary of changes:
 sdk/python/arvados/commands/put.py | 130 ++++++--------------------
 sdk/python/tests/test_arv_put.py   | 181 ++++++++++++++++++++++---------------
 sdk/python/tests/test_arvfile.py   |   0
 3 files changed, 139 insertions(+), 172 deletions(-)
 mode change 100755 => 100644 sdk/python/arvados/commands/put.py
 mode change 100755 => 100644 sdk/python/tests/test_arv_put.py
 mode change 100755 => 100644 sdk/python/tests/test_arvfile.py

       via  1487c0577921c88801c377d753d1322fd1485968 (commit)
      from  1c3f80531764931c241cd07a3cbc56892b645bce (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 1487c0577921c88801c377d753d1322fd1485968
Author: Lucas Di Pentima <lucas at curoverse.com>
Date:   Fri Jul 22 14:51:52 2016 -0300

    9463: Finished writing tests for ArvPutUploadJob

diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
old mode 100755
new mode 100644
index d2c5214..31cb0cb
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -196,10 +196,6 @@ class ResumeCacheConflict(Exception):
     pass
 
 
-class FileUploadError(Exception):
-    pass
-
-
 class ResumeCache(object):
     CACHE_DIR = '.cache/arvados/arv-put'
 
@@ -283,15 +279,18 @@ class ResumeCache(object):
 
 
 class ArvPutUploadJob(object):
+    CACHE_DIR = '.cache/arvados/arv-put'
+
     def __init__(self, paths, resume=True, reporter=None, bytes_expected=None,
-                name=None, owner_uuid=None, ensure_unique_name=False,
-                num_retries=None, write_copies=None, replication=None,
-                filename=None):
+                 name=None, owner_uuid=None, ensure_unique_name=False,
+                 num_retries=None, write_copies=None, replication=None,
+                 filename=None, update_time=60.0):
         self.paths = paths
         self.resume = resume
         self.reporter = reporter
         self.bytes_expected = bytes_expected
         self.bytes_written = 0
+        self.bytes_skipped = 0
         self.name = name
         self.owner_uuid = owner_uuid
         self.ensure_unique_name = ensure_unique_name
@@ -302,13 +301,12 @@ class ArvPutUploadJob(object):
         self._state_lock = threading.Lock()
         self._state = None # Previous run state (file list & manifest)
         self._current_files = [] # Current run file list
-        self._cache_hash = None # MD5 digest based on paths & filename
         self._cache_file = None
         self._collection = None
         self._collection_lock = threading.Lock()
         self._stop_checkpointer = threading.Event()
         self._checkpointer = threading.Thread(target=self._update_task)
-        self._update_task_time = 60.0 # How many seconds wait between update runs
+        self._update_task_time = update_time  # How many seconds wait between update runs
         # Load cached data if any and if needed
         self._setup_state()
 
@@ -324,16 +322,18 @@ class ArvPutUploadJob(object):
                     self._write_stdin(self.filename or 'stdin')
                 elif os.path.isdir(path):
                     self._write_directory_tree(path)
-                else: #if os.path.isfile(path):
+                else:
                     self._write_file(path, self.filename or os.path.basename(path))
-                # else:
-                #     raise FileUploadError('Inadequate file type, cannot upload: %s' % path)
         finally:
             # Stop the thread before doing anything else
             self._stop_checkpointer.set()
             self._checkpointer.join()
         # Successful upload, one last _update()
         self._update()
+        if self.resume:
+            self._cache_file.close()
+            # Correct the final written bytes count
+            self.bytes_written -= self.bytes_skipped
 
     def save_collection(self):
         with self._collection_lock:
@@ -342,10 +342,11 @@ class ArvPutUploadJob(object):
                                 ensure_unique_name=self.ensure_unique_name,
                                 num_retries=self.num_retries,
                                 replication_desired=self.replication)
+
+    def destroy_cache(self):
         if self.resume:
-            # Delete cache file upon successful collection saving
             try:
-                os.unlink(self._cache_file.name)
+                os.unlink(self._cache_filename)
             except OSError as error:
                 if error.errno != errno.ENOENT:  # That's what we wanted anyway.
                     raise
@@ -357,7 +358,7 @@ class ArvPutUploadJob(object):
         """
         size = 0
         for item in collection.values():
-            if isinstance(item, arvados.collection.Collection):
+            if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
                 size += self._collection_size(item)
             else:
                 size += item.size()
@@ -404,11 +405,9 @@ class ArvPutUploadJob(object):
             if os.path.isdir(os.path.join(path, item)):
                 self._write_directory_tree(os.path.join(path, item),
                                 os.path.join(stream_name, item))
-            elif os.path.isfile(os.path.join(path, item)):
+            else:
                 self._write_file(os.path.join(path, item),
                                 os.path.join(stream_name, item))
-            else:
-                raise FileUploadError('Inadequate file type, cannot upload: %s' % path)
 
     def _write_stdin(self, filename):
         with self._collection_lock:
@@ -441,6 +440,7 @@ class ArvPutUploadJob(object):
                 if cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
                     if os.path.getsize(source) == file_in_collection.size():
                         # File already there, skip it.
+                        self.bytes_skipped += os.path.getsize(source)
                         return
                     elif os.path.getsize(source) > file_in_collection.size():
                         # File partially uploaded, resume!
@@ -458,6 +458,7 @@ class ArvPutUploadJob(object):
                     # Open for appending
                     output = self._my_collection().open(filename, 'a')
                 source_fd.seek(resume_offset)
+                self.bytes_skipped += resume_offset
             else:
                 with self._collection_lock:
                     output = self._my_collection().open(filename, 'w')
@@ -498,12 +499,14 @@ class ArvPutUploadJob(object):
             md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
             realpaths = sorted(os.path.realpath(path) for path in self.paths)
             md5.update('\0'.join(realpaths))
-            self._cache_hash = md5.hexdigest()
             if self.filename:
                 md5.update(self.filename)
+            cache_filename = md5.hexdigest()
             self._cache_file = open(os.path.join(
-                arv_cmd.make_home_conf_dir('.cache/arvados/arv-put', 0o700, 'raise'),
-                self._cache_hash), 'a+')
+                arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
+                cache_filename), 'a+')
+            self._cache_filename = self._cache_file.name
+            self._lock_file(self._cache_file)
             self._cache_file.seek(0)
             with self._state_lock:
                 try:
@@ -544,13 +547,13 @@ class ArvPutUploadJob(object):
             with self._state_lock:
                 state = self._state
             new_cache_fd, new_cache_name = tempfile.mkstemp(
-                dir=os.path.dirname(self._cache_file.name))
+                dir=os.path.dirname(self._cache_filename))
             self._lock_file(new_cache_fd)
             new_cache = os.fdopen(new_cache_fd, 'r+')
             json.dump(state, new_cache)
-            # new_cache.flush()
-            # os.fsync(new_cache)
-            os.rename(new_cache_name, self._cache_file.name)
+            new_cache.flush()
+            os.fsync(new_cache)
+            os.rename(new_cache_name, self._cache_filename)
         except (IOError, OSError, ResumeCacheConflict) as error:
             try:
                 os.unlink(new_cache_name)
@@ -607,81 +610,6 @@ class ArvPutUploadJob(object):
         return datablocks
 
 
-class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
-    STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS +
-                   ['bytes_written', '_seen_inputs'])
-
-    def __init__(self, cache=None, reporter=None, bytes_expected=None, **kwargs):
-        self.bytes_written = 0
-        self._seen_inputs = []
-        self.cache = cache
-        self.reporter = reporter
-        self.bytes_expected = bytes_expected
-        super(ArvPutCollectionWriter, self).__init__(**kwargs)
-
-    @classmethod
-    def from_cache(cls, cache, reporter=None, bytes_expected=None,
-                   num_retries=0, replication=0):
-        try:
-            state = cache.load()
-            state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
-            writer = cls.from_state(state, cache, reporter, bytes_expected,
-                                    num_retries=num_retries,
-                                    replication=replication)
-        except (TypeError, ValueError,
-                arvados.errors.StaleWriterStateError) as error:
-            return cls(cache, reporter, bytes_expected,
-                       num_retries=num_retries,
-                       replication=replication)
-        else:
-            return writer
-
-    def cache_state(self):
-        if self.cache is None:
-            return
-        state = self.dump_state()
-        # Transform attributes for serialization.
-        for attr, value in state.items():
-            if attr == '_data_buffer':
-                state[attr] = base64.encodestring(''.join(value))
-            elif hasattr(value, 'popleft'):
-                state[attr] = list(value)
-        self.cache.save(state)
-
-    def report_progress(self):
-        if self.reporter is not None:
-            self.reporter(self.bytes_written, self.bytes_expected)
-
-    def flush_data(self):
-        start_buffer_len = self._data_buffer_len
-        start_block_count = self.bytes_written / arvados.config.KEEP_BLOCK_SIZE
-        super(ArvPutCollectionWriter, self).flush_data()
-        if self._data_buffer_len < start_buffer_len:  # We actually PUT data.
-            self.bytes_written += (start_buffer_len - self._data_buffer_len)
-            self.report_progress()
-            if (self.bytes_written / arvados.config.KEEP_BLOCK_SIZE) > start_block_count:
-                self.cache_state()
-
-    def _record_new_input(self, input_type, source_name, dest_name):
-        # The key needs to be a list because that's what we'll get back
-        # from JSON deserialization.
-        key = [input_type, source_name, dest_name]
-        if key in self._seen_inputs:
-            return False
-        self._seen_inputs.append(key)
-        return True
-
-    def write_file(self, source, filename=None):
-        if self._record_new_input('file', source, filename):
-            super(ArvPutCollectionWriter, self).write_file(source, filename)
-
-    def write_directory_tree(self,
-                             path, stream_name='.', max_manifest_depth=-1):
-        if self._record_new_input('directory', path, stream_name):
-            super(ArvPutCollectionWriter, self).write_directory_tree(
-                path, stream_name, max_manifest_depth)
-
-
 def expected_bytes_for(pathlist):
     # Walk the given directory trees and stat files, adding up file sizes,
     # so we can display progress as percent
@@ -849,6 +777,8 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
 
     if status != 0:
         sys.exit(status)
+    else:
+        writer.destroy_cache()
 
     return output
 
diff --git a/sdk/python/tests/test_arv_put.py b/sdk/python/tests/test_arv_put.py
old mode 100755
new mode 100644
index 5a4269b..87facac
--- a/sdk/python/tests/test_arv_put.py
+++ b/sdk/python/tests/test_arv_put.py
@@ -13,8 +13,7 @@ import tempfile
 import time
 import unittest
 import yaml
-import multiprocessing
-import shutil
+import threading
 import hashlib
 import random
 
@@ -238,66 +237,34 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
                           arv_put.ResumeCache, path)
 
 
-class ArvadosPutCollectionWriterTest(run_test_server.TestCaseWithServers,
-                                     ArvadosBaseTestCase):
+class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
+                          ArvadosBaseTestCase):
     def setUp(self):
-        super(ArvadosPutCollectionWriterTest, self).setUp()
+        super(ArvPutUploadJobTest, self).setUp()
         run_test_server.authorize_with('active')
-        with tempfile.NamedTemporaryFile(delete=False) as cachefile:
-            self.cache = arv_put.ResumeCache(cachefile.name)
-            self.cache_filename = cachefile.name
+        self.exit_lock = threading.Lock()
+        self.save_manifest_lock = threading.Lock()
 
     def tearDown(self):
-        super(ArvadosPutCollectionWriterTest, self).tearDown()
-        if os.path.exists(self.cache_filename):
-            self.cache.destroy()
-        self.cache.close()
-
-    def test_writer_caches(self):
-        cwriter = arv_put.ArvPutCollectionWriter(self.cache)
-        cwriter.write_file('/dev/null')
-        cwriter.cache_state()
-        self.assertTrue(self.cache.load())
-        self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
+        super(ArvPutUploadJobTest, self).tearDown()
 
     def test_writer_works_without_cache(self):
-        cwriter = arv_put.ArvPutCollectionWriter()
-        cwriter.write_file('/dev/null')
-        self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
-
-    def test_writer_resumes_from_cache(self):
-        cwriter = arv_put.ArvPutCollectionWriter(self.cache)
-        with self.make_test_file() as testfile:
-            cwriter.write_file(testfile.name, 'test')
-            cwriter.cache_state()
-            new_writer = arv_put.ArvPutCollectionWriter.from_cache(
-                self.cache)
-            self.assertEqual(
-                ". 098f6bcd4621d373cade4e832627b4f6+4 0:4:test\n",
-                new_writer.manifest_text())
-
-    def test_new_writer_from_stale_cache(self):
-        cwriter = arv_put.ArvPutCollectionWriter(self.cache)
-        with self.make_test_file() as testfile:
-            cwriter.write_file(testfile.name, 'test')
-        new_writer = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
-        new_writer.write_file('/dev/null')
-        self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", new_writer.manifest_text())
-
-    def test_new_writer_from_empty_cache(self):
-        cwriter = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
-        cwriter.write_file('/dev/null')
+        cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
+        cwriter.start()
         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
 
-    def test_writer_resumable_after_arbitrary_bytes(self):
-        cwriter = arv_put.ArvPutCollectionWriter(self.cache)
-        # These bytes are intentionally not valid UTF-8.
-        with self.make_test_file('\x00\x07\xe2') as testfile:
-            cwriter.write_file(testfile.name, 'test')
-            cwriter.cache_state()
-            new_writer = arv_put.ArvPutCollectionWriter.from_cache(
-                self.cache)
-        self.assertEqual(cwriter.manifest_text(), new_writer.manifest_text())
+    def test_writer_works_with_cache(self):
+        with tempfile.NamedTemporaryFile() as f:
+            f.write('foo')
+            f.flush()
+            cwriter = arv_put.ArvPutUploadJob([f.name])
+            cwriter.start()
+            self.assertEqual(3, cwriter.bytes_written)
+            # Don't destroy the cache, and start another upload
+            cwriter_new = arv_put.ArvPutUploadJob([f.name])
+            cwriter_new.start()
+            self.assertEqual(0, cwriter_new.bytes_written)
+            cwriter_new.destroy_cache()
 
     def make_progress_tester(self):
         progression = []
@@ -306,24 +273,94 @@ class ArvadosPutCollectionWriterTest(run_test_server.TestCaseWithServers,
         return progression, record_func
 
     def test_progress_reporting(self):
-        for expect_count in (None, 8):
-            progression, reporter = self.make_progress_tester()
-            cwriter = arv_put.ArvPutCollectionWriter(
-                reporter=reporter, bytes_expected=expect_count)
-            with self.make_test_file() as testfile:
-                cwriter.write_file(testfile.name, 'test')
-            cwriter.finish_current_stream()
-            self.assertIn((4, expect_count), progression)
-
-    def test_resume_progress(self):
-        cwriter = arv_put.ArvPutCollectionWriter(self.cache, bytes_expected=4)
-        with self.make_test_file() as testfile:
-            # Set up a writer with some flushed bytes.
-            cwriter.write_file(testfile.name, 'test')
-            cwriter.finish_current_stream()
-            cwriter.cache_state()
-            new_writer = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
-            self.assertEqual(new_writer.bytes_written, 4)
+        with tempfile.NamedTemporaryFile() as f:
+            f.write('foo')
+            f.flush()
+            for expect_count in (None, 8):
+                progression, reporter = self.make_progress_tester()
+                cwriter = arv_put.ArvPutUploadJob([f.name],
+                    reporter=reporter, bytes_expected=expect_count)
+                cwriter.start()
+                cwriter.destroy_cache()
+                self.assertIn((3, expect_count), progression)
+
+    def test_writer_upload_directory(self):
+        tempdir = tempfile.mkdtemp()
+        subdir = os.path.join(tempdir, 'subdir')
+        os.mkdir(subdir)
+        data = "x" * 1024 # 1 KB
+        for i in range(1, 5):
+            with open(os.path.join(tempdir, str(i)), 'w') as f:
+                f.write(data * i)
+        with open(os.path.join(subdir, 'otherfile'), 'w') as f:
+            f.write(data * 5)
+        cwriter = arv_put.ArvPutUploadJob([tempdir])
+        cwriter.start()
+        cwriter.destroy_cache()
+        shutil.rmtree(tempdir)
+        self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
+
+    def test_resume_large_file_upload(self):
+        # Proxying ArvadosFile.writeto() method to be able to synchronize it
+        # with partial manifest saves
+        orig_func = getattr(arvados.arvfile.ArvadosFile, 'writeto')
+        def wrapped_func(*args, **kwargs):
+            data = args[2]
+            if len(data) < arvados.config.KEEP_BLOCK_SIZE:
+                # Lock on the last block write call, waiting for the
+                # manifest to be saved
+                self.exit_lock.acquire()
+                raise SystemExit('Test exception')
+            ret = orig_func(*args, **kwargs)
+            self.save_manifest_lock.release()
+            return ret
+        setattr(arvados.arvfile.ArvadosFile, 'writeto', wrapped_func)
+        # Take advantage of the reporter feature to sync the partial
+        # manifest writing with the simulated upload error.
+        def fake_reporter(written, expected):
+            # Wait until there's something to save
+            self.save_manifest_lock.acquire()
+            # Once the partial manifest is saved, allow exiting
+            self.exit_lock.release()
+        # Create random data to be uploaded
+        md5_original = hashlib.md5()
+        _, filename = tempfile.mkstemp()
+        fileobj = open(filename, 'w')
+        # Make sure to write just a little more than one block
+        for _ in range((arvados.config.KEEP_BLOCK_SIZE/(1024*1024))+1):
+            data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MB
+            md5_original.update(data)
+            fileobj.write(data)
+        fileobj.close()
+        self.exit_lock.acquire()
+        self.save_manifest_lock.acquire()
+        writer = arv_put.ArvPutUploadJob([filename],
+                                         reporter=fake_reporter,
+                                         update_time=0.1)
+        # First upload: partially completed with simulated error
+        try:
+            self.assertRaises(SystemExit, writer.start())
+        except SystemExit:
+            # Avoid getting a ResumeCacheConflict on the 2nd run
+            writer._cache_file.close()
+        self.assertLess(writer.bytes_written, os.path.getsize(filename))
+
+        # Restore the ArvadosFile.writeto() method to before retrying
+        setattr(arvados.arvfile.ArvadosFile, 'writeto', orig_func)
+        writer_new = arv_put.ArvPutUploadJob([filename])
+        writer_new.start()
+        writer_new.destroy_cache()
+        self.assertEqual(os.path.getsize(filename),
+                         writer.bytes_written + writer_new.bytes_written)
+        # Read the uploaded file to compare its md5 hash
+        md5_uploaded = hashlib.md5()
+        c = arvados.collection.Collection(writer_new.manifest_text())
+        with c.open(os.path.basename(filename), 'r') as f:
+            new_data = f.read()
+            md5_uploaded.update(new_data)
+        self.assertEqual(md5_original.hexdigest(), md5_uploaded.hexdigest())
+        # Cleaning up
+        os.unlink(filename)
 
 
 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
diff --git a/sdk/python/tests/test_arvfile.py b/sdk/python/tests/test_arvfile.py
old mode 100755
new mode 100644

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list