[ARVADOS] updated: 8be35986ae62ce39691c4c6d491601012255b1ba

Git user git at public.curoverse.com
Fri Dec 9 17:28:59 EST 2016


Summary of changes:
 sdk/python/arvados/arvfile.py      |  8 ++++
 sdk/python/arvados/commands/put.py | 49 +++++++++++++++----------
 sdk/python/tests/test_arv_put.py   | 75 ++++++++++++++++++++++++++++++++++----
 sdk/python/tests/test_arvfile.py   | 21 +++++++++++
 4 files changed, 125 insertions(+), 28 deletions(-)

       via  8be35986ae62ce39691c4c6d491601012255b1ba (commit)
       via  1c96d4eb740dc4285d9f30ba0ce73499f7b7d59e (commit)
       via  a62e41101a75e01f9b0dd7124eef81714443d8a1 (commit)
       via  a175a62538e4db86296e49e5412be41e026e73b6 (commit)
       via  8303a85b6c3fed92710a590fa5653557b73439cf (commit)
       via  90de410ff977f3669d33f532adf0cb71ac8ac261 (commit)
      from  2225be2cb92613495450528ba24a9ca14f232748 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 8be35986ae62ce39691c4c6d491601012255b1ba
Author: Lucas Di Pentima <lucas at curoverse.com>
Date:   Fri Dec 9 19:27:19 2016 -0300

    10383: Added 2 tests to check that no resuming take place when using either --no-resume or --no-cache.

diff --git a/sdk/python/tests/test_arv_put.py b/sdk/python/tests/test_arv_put.py
index f42c0fc..0c1d377 100644
--- a/sdk/python/tests/test_arv_put.py
+++ b/sdk/python/tests/test_arv_put.py
@@ -324,6 +324,7 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
                                              replication_desired=1)
             with self.assertRaises(SystemExit):
                 writer.start(save_collection=False)
+            # Confirm that the file was partially uploaded
             self.assertGreater(writer.bytes_written, 0)
             self.assertLess(writer.bytes_written,
                             os.path.getsize(self.large_file_name))
@@ -335,6 +336,65 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
                          os.path.getsize(self.large_file_name))
         writer2.destroy_cache()
 
+    def test_no_resume_when_asked(self):
+        def wrapped_write(*args, **kwargs):
+            data = args[1]
+            # Exit only on last block
+            if len(data) < arvados.config.KEEP_BLOCK_SIZE:
+                raise SystemExit("Simulated error")
+            return self.arvfile_write(*args, **kwargs)
+
+        with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
+                        autospec=True) as mocked_write:
+            mocked_write.side_effect = wrapped_write
+            writer = arv_put.ArvPutUploadJob([self.large_file_name],
+                                             replication_desired=1)
+            with self.assertRaises(SystemExit):
+                writer.start(save_collection=False)
+            # Confirm that the file was partially uploaded
+            self.assertGreater(writer.bytes_written, 0)
+            self.assertLess(writer.bytes_written,
+                            os.path.getsize(self.large_file_name))
+        # Retry the upload, this time without resume
+        writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
+                                          replication_desired=1,
+                                          resume=False)
+        writer2.start(save_collection=False)
+        self.assertEqual(writer2.bytes_skipped, 0)
+        self.assertEqual(writer2.bytes_written,
+                         os.path.getsize(self.large_file_name))
+        writer2.destroy_cache()
+
+    def test_no_resume_when_no_cache(self):
+        def wrapped_write(*args, **kwargs):
+            data = args[1]
+            # Exit only on last block
+            if len(data) < arvados.config.KEEP_BLOCK_SIZE:
+                raise SystemExit("Simulated error")
+            return self.arvfile_write(*args, **kwargs)
+
+        with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
+                        autospec=True) as mocked_write:
+            mocked_write.side_effect = wrapped_write
+            writer = arv_put.ArvPutUploadJob([self.large_file_name],
+                                             replication_desired=1)
+            with self.assertRaises(SystemExit):
+                writer.start(save_collection=False)
+            # Confirm that the file was partially uploaded
+            self.assertGreater(writer.bytes_written, 0)
+            self.assertLess(writer.bytes_written,
+                            os.path.getsize(self.large_file_name))
+        # Retry the upload, this time without cache usage
+        writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
+                                          replication_desired=1,
+                                          resume=False,
+                                          use_cache=False)
+        writer2.start(save_collection=False)
+        self.assertEqual(writer2.bytes_skipped, 0)
+        self.assertEqual(writer2.bytes_written,
+                         os.path.getsize(self.large_file_name))
+        writer2.destroy_cache()
+
 
 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
     TEST_SIZE = os.path.getsize(__file__)

commit 1c96d4eb740dc4285d9f30ba0ce73499f7b7d59e
Author: Lucas Di Pentima <lucas at curoverse.com>
Date:   Fri Dec 9 19:26:51 2016 -0300

    10383: Exception usage correction

diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index 8995ea9..88956cd 100644
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -222,6 +222,8 @@ class CollectionUpdateError(Exception):
 class ResumeCacheConflict(Exception):
     pass
 
+class ArvPutArgumentConflict(Exception):
+    pass
 
 class ResumeCache(object):
     CACHE_DIR = '.cache/arvados/arv-put'
@@ -345,7 +347,7 @@ class ArvPutUploadJob(object):
         self.logger = logging.getLogger('arvados.arv_put')
 
         if not self.use_cache and self.resume:
-            raise ArgumentError('resume cannot be True when use_cache is False')
+            raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
 
         # Load cached data if any and if needed
         self._setup_state(update_collection)

commit a62e41101a75e01f9b0dd7124eef81714443d8a1
Author: Lucas Di Pentima <lucas at curoverse.com>
Date:   Fri Dec 9 18:36:29 2016 -0300

    10383: Refactored the file upload decision code so that it first skims through
    the entire list, instead of deciding one a file by file basis. This allows to
    pre-calculate how many bytes are going to be skipped when resuming, and that
    allows for a precise way of showing the upload progress report.
    Also updated affected tests so they pass with this new way of counting bytes.

diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index 3e82bdf..8995ea9 100644
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -341,6 +341,7 @@ class ArvPutUploadJob(object):
         self._stop_checkpointer = threading.Event()
         self._checkpointer = threading.Thread(target=self._update_task)
         self._update_task_time = update_time  # How many seconds wait between update runs
+        self._files_to_upload = []
         self.logger = logging.getLogger('arvados.arv_put')
 
         if not self.use_cache and self.resume:
@@ -371,11 +372,16 @@ class ArvPutUploadJob(object):
                         dirs.sort()
                         files.sort()
                         for f in files:
-                            self._write_file(os.path.join(root, f),
+                            self._check_file(os.path.join(root, f),
                                              os.path.join(root[len(prefixdir):], f))
                 else:
-                    self._write_file(os.path.abspath(path),
+                    self._check_file(os.path.abspath(path),
                                      self.filename or os.path.basename(path))
+            # Update bytes_written from current local collection and
+            # report initial progress.
+            self._update()
+            # Actual file upload
+            self._upload_files()
         finally:
             # Stop the thread before doing anything else
             self._stop_checkpointer.set()
@@ -387,8 +393,6 @@ class ArvPutUploadJob(object):
                 self._cache_file.close()
             if save_collection:
                 self.save_collection()
-            # Correct the final written bytes count
-            self.bytes_written -= self.bytes_skipped
 
     def save_collection(self):
         if self.update:
@@ -468,11 +472,11 @@ class ArvPutUploadJob(object):
         self._write(sys.stdin, output)
         output.close()
 
-    def _write_file(self, source, filename):
+    def _check_file(self, source, filename):
+        """Check if this file needs to be uploaded"""
         resume_offset = 0
         should_upload = False
         new_file_in_cache = False
-
         # Record file path for updating the remote collection before exiting
         self._file_paths.append(filename)
 
@@ -505,22 +509,31 @@ class ArvPutUploadJob(object):
                 # Permission token expired, re-upload file. This will change whenever
                 # we have a API for refreshing tokens.
                 should_upload = True
+                self._local_collection.remove(filename)
             elif cached_file_data['size'] == file_in_local_collection.size():
                 # File already there, skip it.
                 self.bytes_skipped += cached_file_data['size']
             elif cached_file_data['size'] > file_in_local_collection.size():
                 # File partially uploaded, resume!
                 resume_offset = file_in_local_collection.size()
+                self.bytes_skipped += resume_offset
                 should_upload = True
             else:
                 # Inconsistent cache, re-upload the file
                 should_upload = True
+                self._local_collection.remove(filename)
                 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
         # Local file differs from cached data, re-upload it.
         else:
+            if file_in_local_collection:
+                self._local_collection.remove(filename)
             should_upload = True
 
         if should_upload:
+            self._files_to_upload.append((source, resume_offset, filename))
+
+    def _upload_files(self):
+        for source, resume_offset, filename in self._files_to_upload:
             with open(source, 'r') as source_fd:
                 with self._state_lock:
                     self._state['files'][source]['mtime'] = os.path.getmtime(source)
@@ -529,7 +542,6 @@ class ArvPutUploadJob(object):
                     # Start upload where we left off
                     output = self._local_collection.open(filename, 'a')
                     source_fd.seek(resume_offset)
-                    self.bytes_skipped += resume_offset
                 else:
                     # Start from scratch
                     output = self._local_collection.open(filename, 'w')
@@ -594,9 +606,6 @@ class ArvPutUploadJob(object):
                 self._state = copy.deepcopy(self.EMPTY_STATE)
             # Load the previous manifest so we can check if files were modified remotely.
             self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired)
-        # Load how many bytes were uploaded on previous run
-        with self._collection_lock:
-            self.bytes_written = self._collection_size(self._local_collection)
 
     def _lock_file(self, fileobj):
         try:
diff --git a/sdk/python/tests/test_arv_put.py b/sdk/python/tests/test_arv_put.py
index fee32a3..f42c0fc 100644
--- a/sdk/python/tests/test_arv_put.py
+++ b/sdk/python/tests/test_arv_put.py
@@ -278,12 +278,12 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
             f.flush()
             cwriter = arv_put.ArvPutUploadJob([f.name])
             cwriter.start(save_collection=False)
-            self.assertEqual(3, cwriter.bytes_written)
+            self.assertEqual(3, cwriter.bytes_written - cwriter.bytes_skipped)
             # Don't destroy the cache, and start another upload
             cwriter_new = arv_put.ArvPutUploadJob([f.name])
             cwriter_new.start(save_collection=False)
             cwriter_new.destroy_cache()
-            self.assertEqual(0, cwriter_new.bytes_written)
+            self.assertEqual(0, cwriter_new.bytes_written - cwriter_new.bytes_skipped)
 
     def make_progress_tester(self):
         progression = []
@@ -324,13 +324,14 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
                                              replication_desired=1)
             with self.assertRaises(SystemExit):
                 writer.start(save_collection=False)
-                self.assertLess(writer.bytes_written,
-                                os.path.getsize(self.large_file_name))
+            self.assertGreater(writer.bytes_written, 0)
+            self.assertLess(writer.bytes_written,
+                            os.path.getsize(self.large_file_name))
         # Retry the upload
         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
                                           replication_desired=1)
         writer2.start(save_collection=False)
-        self.assertEqual(writer.bytes_written + writer2.bytes_written,
+        self.assertEqual(writer.bytes_written + writer2.bytes_written - writer2.bytes_skipped,
                          os.path.getsize(self.large_file_name))
         writer2.destroy_cache()
 

commit a175a62538e4db86296e49e5412be41e026e73b6
Author: Lucas Di Pentima <lucas at curoverse.com>
Date:   Thu Dec 8 17:56:44 2016 -0300

    10383: Check for expired tokens when getting already uploaded files.

diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index 610fd7d..eadb3a9 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -760,6 +760,14 @@ class ArvadosFile(object):
         return self.parent.writable()
 
     @synchronized
+    def permission_expired(self, as_of_dt=None):
+        """Returns True if any of the segment's locators is expired"""
+        for r in self._segments:
+            if KeepLocator(r.locator).permission_expired(as_of_dt):
+                return True
+        return False
+
+    @synchronized
     def segments(self):
         return copy.copy(self._segments)
 
diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index e5c85a9..3e82bdf 100644
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -501,6 +501,10 @@ class ArvPutUploadJob(object):
             if not file_in_local_collection:
                 # File not uploaded yet, upload it completely
                 should_upload = True
+            elif file_in_local_collection.permission_expired():
+                # Permission token expired, re-upload file. This will change whenever
+                # we have a API for refreshing tokens.
+                should_upload = True
             elif cached_file_data['size'] == file_in_local_collection.size():
                 # File already there, skip it.
                 self.bytes_skipped += cached_file_data['size']
diff --git a/sdk/python/tests/test_arvfile.py b/sdk/python/tests/test_arvfile.py
index 6b35626..8f02d51 100644
--- a/sdk/python/tests/test_arvfile.py
+++ b/sdk/python/tests/test_arvfile.py
@@ -1,6 +1,7 @@
 #!/usr/bin/env python
 
 import bz2
+import datetime
 import gzip
 import io
 import mock
@@ -570,6 +571,26 @@ class ArvadosFileReadlinesTestCase(ArvadosFileReadTestCase):
     def read_for_test(self, reader, byte_count, **kwargs):
         return ''.join(reader.readlines(**kwargs))
 
+
+class ArvadosFileTestCase(unittest.TestCase):
+    def datetime_to_hex(self, dt):
+        return hex(int(time.mktime(dt.timetuple())))[2:]
+
+    def test_permission_expired(self):
+        base_manifest = ". 781e5e245d69b566979b86e28d23f2c7+10+A715fd31f8111894f717eb1003c1b0216799dd9ec@{} 0:10:count.txt\n"
+        now = datetime.datetime.now()
+        a_week_ago = now - datetime.timedelta(days=7)
+        a_month_ago = now - datetime.timedelta(days=30)
+        a_week_from_now = now + datetime.timedelta(days=7)
+        with Collection(base_manifest.format(self.datetime_to_hex(a_week_from_now))) as c:
+            self.assertFalse(c.find('count.txt').permission_expired())
+        with Collection(base_manifest.format(self.datetime_to_hex(a_week_ago))) as c:
+            f = c.find('count.txt')
+            self.assertTrue(f.permission_expired())
+            self.assertTrue(f.permission_expired(a_week_from_now))
+            self.assertFalse(f.permission_expired(a_month_ago))
+
+
 class BlockManagerTest(unittest.TestCase):
     def test_bufferblock_append(self):
         keep = ArvadosFileWriterTestCase.MockKeep({})

commit 8303a85b6c3fed92710a590fa5653557b73439cf
Author: Lucas Di Pentima <lucas at curoverse.com>
Date:   Thu Dec 8 13:27:20 2016 -0300

    10383: Changed the time between updates from 1 to 20 secs

diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index 8f7a7b7..e5c85a9 100644
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -315,7 +315,7 @@ class ArvPutUploadJob(object):
     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
                  bytes_expected=None, name=None, owner_uuid=None,
                  ensure_unique_name=False, num_retries=None, replication_desired=None,
-                 filename=None, update_time=1.0, update_collection=None):
+                 filename=None, update_time=20.0, update_collection=None):
         self.paths = paths
         self.resume = resume
         self.use_cache = use_cache

commit 90de410ff977f3669d33f532adf0cb71ac8ac261
Author: Lucas Di Pentima <lucas at curoverse.com>
Date:   Thu Dec 8 13:25:06 2016 -0300

    10383: Removed deprecated --max-manifest-depth argument from the help message and ignore its value in make_path.

diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index c7de888..8f7a7b7 100644
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -40,13 +40,7 @@ Local file or directory. Default: read from standard input.
 _group = upload_opts.add_mutually_exclusive_group()
 
 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
-                    default=-1, help="""
-Maximum depth of directory tree to represent in the manifest
-structure. A directory structure deeper than this will be represented
-as a single stream in the manifest. If N=0, the manifest will contain
-a single stream. Default: -1 (unlimited), i.e., exactly one manifest
-stream per filesystem directory that contains files.
-""")
+                    default=-1, help=argparse.SUPPRESS)
 
 _group.add_argument('--normalize', action='store_true',
                     help="""
@@ -244,7 +238,7 @@ class ResumeCache(object):
         realpaths = sorted(os.path.realpath(path) for path in args.paths)
         md5.update('\0'.join(realpaths))
         if any(os.path.isdir(path) for path in realpaths):
-            md5.update(str(max(args.max_manifest_depth, -1)))
+            md5.update("-1")
         elif args.filename:
             md5.update(args.filename)
         return os.path.join(
diff --git a/sdk/python/tests/test_arv_put.py b/sdk/python/tests/test_arv_put.py
index e4b2724..fee32a3 100644
--- a/sdk/python/tests/test_arv_put.py
+++ b/sdk/python/tests/test_arv_put.py
@@ -31,9 +31,7 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
         [],
         ['/dev/null'],
         ['/dev/null', '--filename', 'empty'],
-        ['/tmp'],
-        ['/tmp', '--max-manifest-depth', '0'],
-        ['/tmp', '--max-manifest-depth', '1']
+        ['/tmp']
         ]
 
     def tearDown(self):

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list