[ARVADOS] updated: e69ce852e1cbbe5bab82e32ec5d1874ef5a768f3

Git user git at public.curoverse.com
Fri Jul 29 09:10:51 EDT 2016


Summary of changes:
 sdk/python/arvados/arvfile.py        | 10 ++--
 sdk/python/arvados/collection.py     | 40 +++++++++-------
 sdk/python/arvados/commands/put.py   | 91 +++++++++++++++---------------------
 sdk/python/tests/test_arv_put.py     | 90 +++++++++++++++++++----------------
 sdk/python/tests/test_arvfile.py     |  5 +-
 sdk/python/tests/test_collections.py | 18 +++++++
 6 files changed, 137 insertions(+), 117 deletions(-)

       via  e69ce852e1cbbe5bab82e32ec5d1874ef5a768f3 (commit)
       via  62d56bce0d714cc2df2ab5e7f1005dc3d76f783b (commit)
       via  69902ee6583e1de32786e80b77c8f61870ed6f90 (commit)
       via  9a363fce5687e55c5554b3eaeee16e7f1f0791f6 (commit)
       via  2b52a4885952a8a3eed01b03af33210fc86d6ce5 (commit)
       via  05e30c2874cfee6e448de38254d4eb6007abb1cd (commit)
       via  bca6c8e4c3d880955d19f7b6ff50bc3fbc31146c (commit)
      from  1487c0577921c88801c377d753d1322fd1485968 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit e69ce852e1cbbe5bab82e32ec5d1874ef5a768f3
Author: Lucas Di Pentima <lucas at curoverse.com>
Date:   Fri Jul 29 10:08:36 2016 -0300

    9463: Unify replication_desired & write_copies parameters to only one, passing it at Collection's constructor and let the class decide which value is best from default when None is passed.

diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index f076773..b2c40f1 100644
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -289,7 +289,7 @@ class ArvPutUploadJob(object):
 
     def __init__(self, paths, resume=True, reporter=None, bytes_expected=None,
                  name=None, owner_uuid=None, ensure_unique_name=False,
-                 num_retries=None, write_copies=None, replication=None,
+                 num_retries=None, replication_desired=None,
                  filename=None, update_time=60.0):
         self.paths = paths
         self.resume = resume
@@ -301,8 +301,7 @@ class ArvPutUploadJob(object):
         self.owner_uuid = owner_uuid
         self.ensure_unique_name = ensure_unique_name
         self.num_retries = num_retries
-        self.write_copies = write_copies
-        self.replication = replication
+        self.replication_desired = replication_desired
         self.filename = filename
         self._state_lock = threading.Lock()
         self._state = None # Previous run state (file list & manifest)
@@ -345,17 +344,17 @@ class ArvPutUploadJob(object):
     def save_collection(self):
         with self._collection_lock:
             self._my_collection().save_new(
-                                name=self.name, owner_uuid=self.owner_uuid,
-                                ensure_unique_name=self.ensure_unique_name,
-                                num_retries=self.num_retries,
-                                replication_desired=self.replication)
+                name=self.name, owner_uuid=self.owner_uuid,
+                ensure_unique_name=self.ensure_unique_name,
+                num_retries=self.num_retries)
 
     def destroy_cache(self):
         if self.resume:
             try:
                 os.unlink(self._cache_filename)
             except OSError as error:
-                if error.errno != errno.ENOENT:  # That's what we wanted anyway.
+                # That's what we wanted anyway.
+                if error.errno != errno.ENOENT:
                     raise
             self._cache_file.close()
 
@@ -440,7 +439,8 @@ class ArvPutUploadJob(object):
                         'mtime': os.path.getmtime(source),
                         'size' : os.path.getsize(source)
                     }
-            cached_file_data = self._state['files'][source]
+            with self._state_lock:
+                cached_file_data = self._state['files'][source]
             # See if this file was already uploaded at least partially
             if file_in_collection:
                 if cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
@@ -488,12 +488,12 @@ class ArvPutUploadJob(object):
             if self.resume and manifest is not None:
                 # Create collection from saved state
                 self._collection = arvados.collection.Collection(
-                                        manifest,
-                                        num_write_copies=self.write_copies)
+                    manifest,
+                    replication_desired=self.replication_desired)
             else:
                 # Create new collection
                 self._collection = arvados.collection.Collection(
-                                        num_write_copies=self.write_copies)
+                    replication_desired=self.replication_desired)
         return self._collection
 
     def _setup_state(self):
@@ -688,19 +688,6 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
         print >>stderr, error
         sys.exit(1)
 
-    # write_copies diverges from args.replication here.
-    # args.replication is how many copies we will instruct Arvados to
-    # maintain (by passing it in collections().create()) after all
-    # data is written -- and if None was given, we'll use None there.
-    # Meanwhile, write_copies is how many copies of each data block we
-    # write to Keep, which has to be a number.
-    #
-    # If we simply changed args.replication from None to a default
-    # here, we'd end up erroneously passing the default replication
-    # level (instead of None) to collections().create().
-    write_copies = (args.replication or
-                    api_client._rootDesc.get('defaultCollectionReplication', 2))
-
     if args.progress:
         reporter = progress_writer(human_progress)
     elif args.batch_progress:
@@ -715,8 +702,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
                                 reporter = reporter,
                                 bytes_expected = bytes_expected,
                                 num_retries = args.retries,
-                                write_copies = write_copies,
-                                replication = args.replication,
+                                replication_desired = args.replication,
                                 name = collection_name,
                                 owner_uuid = project_uuid,
                                 ensure_unique_name = True)
@@ -776,9 +762,9 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
 
     if status != 0:
         sys.exit(status)
-    else:
-        writer.destroy_cache()
 
+    # Success!
+    writer.destroy_cache()
     return output
 
 

commit 62d56bce0d714cc2df2ab5e7f1005dc3d76f783b
Author: Lucas Di Pentima <lucas at curoverse.com>
Date:   Thu Jul 28 14:58:12 2016 -0300

    9463: Unified use of 'replication_desired' param on Collection class at instantiation.
    
    Removed the need to pass the number of copies to be written to keep on save_new() method,
    it will be inferred from replication_desired setting or looked up from defaults.
    
    Added functionality to Collection class to keep replication_desired configuration when
    loading an already existing collection from API server, with tests validating this new
    behaviour.
    
    Corrected some already existing tests to work with this changes.

diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index 098427c..62b6526 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -1136,7 +1136,7 @@ class Collection(RichCollectionBase):
                  parent=None,
                  apiconfig=None,
                  block_manager=None,
-                 num_write_copies=None):
+                 replication_desired=None):
         """Collection constructor.
 
         :manifest_locator_or_text:
@@ -1144,24 +1144,35 @@ class Collection(RichCollectionBase):
           a manifest, raw manifest text, or None (to create an empty collection).
         :parent:
           the parent Collection, may be None.
+
         :apiconfig:
           A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
           Prefer this over supplying your own api_client and keep_client (except in testing).
           Will use default config settings if not specified.
+
         :api_client:
           The API client object to use for requests.  If not specified, create one using `apiconfig`.
+
         :keep_client:
           the Keep client to use for requests.  If not specified, create one using `apiconfig`.
+
         :num_retries:
           the number of retries for API and Keep requests.
+
         :block_manager:
           the block manager to use.  If not specified, create one.
 
+        :replication_desired:
+          How many copies should Arvados maintain. If None, API server default
+          configuration applies. If not None, this value will also be used
+          for determining the number of block copies being written.
+
         """
         super(Collection, self).__init__(parent)
         self._api_client = api_client
         self._keep_client = keep_client
         self._block_manager = block_manager
+        self.replication_desired = replication_desired
 
         if apiconfig:
             self._config = apiconfig
@@ -1169,7 +1180,6 @@ class Collection(RichCollectionBase):
             self._config = config.settings()
 
         self.num_retries = num_retries if num_retries is not None else 0
-        self.num_write_copies = num_write_copies
         self._manifest_locator = None
         self._manifest_text = None
         self._api_response = None
@@ -1234,7 +1244,8 @@ class Collection(RichCollectionBase):
     def _my_api(self):
         if self._api_client is None:
             self._api_client = ThreadSafeApiCache(self._config)
-            self._keep_client = self._api_client.keep
+            if self._keep_client is None:
+                self._keep_client = self._api_client.keep
         return self._api_client
 
     @synchronized
@@ -1249,7 +1260,10 @@ class Collection(RichCollectionBase):
     @synchronized
     def _my_block_manager(self):
         if self._block_manager is None:
-            self._block_manager = _BlockManager(self._my_keep(), copies=self.num_write_copies)
+            copies = (self.replication_desired or
+                      self._my_api()._rootDesc.get('defaultCollectionReplication',
+                                                   2))
+            self._block_manager = _BlockManager(self._my_keep(), copies=copies)
         return self._block_manager
 
     def _remember_api_response(self, response):
@@ -1269,6 +1283,10 @@ class Collection(RichCollectionBase):
                 uuid=self._manifest_locator).execute(
                     num_retries=self.num_retries))
             self._manifest_text = self._api_response['manifest_text']
+            # If not overriden via kwargs, we should try to load the
+            # replication_desired from the API server
+            if self.replication_desired is None:
+                self.replication_desired = self._api_response.get('replication_desired', None)
             return None
         except Exception as e:
             return e
@@ -1442,8 +1460,7 @@ class Collection(RichCollectionBase):
                  create_collection_record=True,
                  owner_uuid=None,
                  ensure_unique_name=False,
-                 num_retries=None,
-                 replication_desired=None):
+                 num_retries=None):
         """Save collection to a new collection record.
 
         Commit pending buffer blocks to Keep and, when create_collection_record
@@ -1470,27 +1487,18 @@ class Collection(RichCollectionBase):
         :num_retries:
           Retry count on API calls (if None,  use the collection default)
 
-        :replication_desired:
-          How many copies should Arvados maintain. If None, API server default
-          configuration applies.
-
         """
         self._my_block_manager().commit_all()
         text = self.manifest_text(strip=False)
 
         if create_collection_record:
-            replication_attr = 'replication_desired'
-            if self._my_api()._schema.schemas['Collection']['properties'].get(replication_attr, None) is None:
-                # API called it 'redundancy' before #3410.
-                replication_attr = 'redundancy'
-
             if name is None:
                 name = "New collection"
                 ensure_unique_name = True
 
             body = {"manifest_text": text,
                     "name": name,
-                    replication_attr: replication_desired}
+                    "replication_desired": self.replication_desired}
             if owner_uuid:
                 body["owner_uuid"] = owner_uuid
 
diff --git a/sdk/python/tests/test_arvfile.py b/sdk/python/tests/test_arvfile.py
index 2e43216..6b35626 100644
--- a/sdk/python/tests/test_arvfile.py
+++ b/sdk/python/tests/test_arvfile.py
@@ -28,7 +28,7 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
         def get_from_cache(self, locator):
             self.requests.append(locator)
             return self.blocks.get(locator)
-        def put(self, data, num_retries=None):
+        def put(self, data, num_retries=None, copies=None):
             pdh = tutil.str_keep_locator(data)
             self.blocks[pdh] = str(data)
             return pdh
@@ -38,6 +38,7 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
             self.body = b
             self.response = r
             self._schema = ArvadosFileWriterTestCase.MockApi.MockSchema()
+            self._rootDesc = {}
         class MockSchema(object):
             def __init__(self):
                 self.schemas = {'Collection': {'properties': {'replication_desired': {'type':'integer'}}}}
diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py
index ff0d684..41c8c01 100644
--- a/sdk/python/tests/test_collections.py
+++ b/sdk/python/tests/test_collections.py
@@ -804,6 +804,24 @@ class CollectionWriterTestCase(unittest.TestCase, CollectionTestMixin):
 
 class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin):
 
+    def test_replication_desired_kept_on_load(self):
+        m = '. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt 0:10:count2.txt\n'
+        c1 = Collection(m, replication_desired=1)
+        c1.save_new()
+        loc = c1.manifest_locator()
+        c2 = Collection(loc)
+        self.assertEqual(c1.manifest_text, c2.manifest_text)
+        self.assertEqual(c1.replication_desired, c2.replication_desired)
+
+    def test_replication_desired_not_loaded_if_provided(self):
+        m = '. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt 0:10:count2.txt\n'
+        c1 = Collection(m, replication_desired=1)
+        c1.save_new()
+        loc = c1.manifest_locator()
+        c2 = Collection(loc, replication_desired=2)
+        self.assertEqual(c1.manifest_text, c2.manifest_text)
+        self.assertNotEqual(c1.replication_desired, c2.replication_desired)
+
     def test_init_manifest(self):
         m1 = """. 5348b82a029fd9e971a811ce1f71360b+43 0:43:md5sum.txt
 . 085c37f02916da1cad16f93c54d899b7+41 0:41:md5sum.txt

commit 69902ee6583e1de32786e80b77c8f61870ed6f90
Author: Lucas Di Pentima <lucas at curoverse.com>
Date:   Wed Jul 27 09:44:18 2016 -0300

    9463: Parameter naming unification on _BlockManager. Corrected mocked discovery doc on test.

diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index 90db6ce..f2f7df2 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -414,7 +414,7 @@ class _BlockManager(object):
         self.prefetch_enabled = True
         self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
         self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
-        self.num_put_copies = copies
+        self.copies = copies
 
     @synchronized
     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
@@ -465,10 +465,10 @@ class _BlockManager(object):
                 if bufferblock is None:
                     return
 
-                if self.num_put_copies is None:
+                if self.copies is None:
                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
                 else:
-                    loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.num_put_copies)
+                    loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
 
             except Exception as e:
@@ -581,10 +581,10 @@ class _BlockManager(object):
 
         if sync:
             try:
-                if self.num_put_copies is None:
+                if self.copies is None:
                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
                 else:
-                    loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.num_put_copies)
+                    loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
                 block.set_state(_BufferBlock.COMMITTED, loc)
             except Exception as e:
                 block.set_state(_BufferBlock.ERROR, e)
diff --git a/sdk/python/tests/test_arvfile.py b/sdk/python/tests/test_arvfile.py
index da8bf68..2e43216 100644
--- a/sdk/python/tests/test_arvfile.py
+++ b/sdk/python/tests/test_arvfile.py
@@ -40,7 +40,7 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
             self._schema = ArvadosFileWriterTestCase.MockApi.MockSchema()
         class MockSchema(object):
             def __init__(self):
-                self.schemas = {'Collection': {'properties': {'replication_desired': 2}}}
+                self.schemas = {'Collection': {'properties': {'replication_desired': {'type':'integer'}}}}
         class MockCollections(object):
             def __init__(self, b, r):
                 self.body = b

commit 9a363fce5687e55c5554b3eaeee16e7f1f0791f6
Author: Lucas Di Pentima <lucas at curoverse.com>
Date:   Tue Jul 26 19:39:27 2016 -0300

    9463: Temp files & dirs creation/cleanup moved to setUp and tearDown stages. Replaced usage of reporter function with a second wrapper function to make the test cleaner

diff --git a/sdk/python/tests/test_arv_put.py b/sdk/python/tests/test_arv_put.py
index 87facac..7d8d790 100644
--- a/sdk/python/tests/test_arv_put.py
+++ b/sdk/python/tests/test_arv_put.py
@@ -244,9 +244,29 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
         run_test_server.authorize_with('active')
         self.exit_lock = threading.Lock()
         self.save_manifest_lock = threading.Lock()
+        # Temp files creation
+        self.tempdir = tempfile.mkdtemp()
+        subdir = os.path.join(self.tempdir, 'subdir')
+        os.mkdir(subdir)
+        data = "x" * 1024 # 1 KB
+        for i in range(1, 5):
+            with open(os.path.join(self.tempdir, str(i)), 'w') as f:
+                f.write(data * i)
+        with open(os.path.join(subdir, 'otherfile'), 'w') as f:
+            f.write(data * 5)
+        # For large file resuming test
+        _, self.large_file_name = tempfile.mkstemp()
+        fileobj = open(self.large_file_name, 'w')
+        # Make sure to write just a little more than one block
+        for _ in range((arvados.config.KEEP_BLOCK_SIZE/(1024*1024))+1):
+            data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MB
+            fileobj.write(data)
+        fileobj.close()
 
     def tearDown(self):
         super(ArvPutUploadJobTest, self).tearDown()
+        shutil.rmtree(self.tempdir)
+        os.unlink(self.large_file_name)
 
     def test_writer_works_without_cache(self):
         cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
@@ -285,57 +305,43 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
                 self.assertIn((3, expect_count), progression)
 
     def test_writer_upload_directory(self):
-        tempdir = tempfile.mkdtemp()
-        subdir = os.path.join(tempdir, 'subdir')
-        os.mkdir(subdir)
-        data = "x" * 1024 # 1 KB
-        for i in range(1, 5):
-            with open(os.path.join(tempdir, str(i)), 'w') as f:
-                f.write(data * i)
-        with open(os.path.join(subdir, 'otherfile'), 'w') as f:
-            f.write(data * 5)
-        cwriter = arv_put.ArvPutUploadJob([tempdir])
+        cwriter = arv_put.ArvPutUploadJob([self.tempdir])
         cwriter.start()
         cwriter.destroy_cache()
-        shutil.rmtree(tempdir)
         self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
 
     def test_resume_large_file_upload(self):
         # Proxying ArvadosFile.writeto() method to be able to synchronize it
         # with partial manifest saves
-        orig_func = getattr(arvados.arvfile.ArvadosFile, 'writeto')
-        def wrapped_func(*args, **kwargs):
+        orig_writeto_func = getattr(arvados.arvfile.ArvadosFile, 'writeto')
+        orig_update_func = getattr(arv_put.ArvPutUploadJob, '_update')
+        def wrapped_update(*args, **kwargs):
+            job_instance = args[0]
+            orig_update_func(*args, **kwargs)
+            with self.save_manifest_lock:
+                # Allow abnormal termination when first block written
+                if job_instance._collection_size(job_instance._my_collection()) == arvados.config.KEEP_BLOCK_SIZE:
+                    self.exit_lock.release()
+        def wrapped_writeto(*args, **kwargs):
             data = args[2]
             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
                 # Lock on the last block write call, waiting for the
                 # manifest to be saved
-                self.exit_lock.acquire()
-                raise SystemExit('Test exception')
-            ret = orig_func(*args, **kwargs)
+                with self.exit_lock:
+                    raise SystemExit('Test exception')
+            ret = orig_writeto_func(*args, **kwargs)
             self.save_manifest_lock.release()
             return ret
-        setattr(arvados.arvfile.ArvadosFile, 'writeto', wrapped_func)
-        # Take advantage of the reporter feature to sync the partial
-        # manifest writing with the simulated upload error.
-        def fake_reporter(written, expected):
-            # Wait until there's something to save
-            self.save_manifest_lock.acquire()
-            # Once the partial manifest is saved, allow exiting
-            self.exit_lock.release()
-        # Create random data to be uploaded
+        setattr(arvados.arvfile.ArvadosFile, 'writeto', wrapped_writeto)
+        setattr(arv_put.ArvPutUploadJob, '_update', wrapped_update)
+        # MD5 hash of random data to be uploaded
         md5_original = hashlib.md5()
-        _, filename = tempfile.mkstemp()
-        fileobj = open(filename, 'w')
-        # Make sure to write just a little more than one block
-        for _ in range((arvados.config.KEEP_BLOCK_SIZE/(1024*1024))+1):
-            data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MB
+        with open(self.large_file_name, 'r') as f:
+            data = f.read()
             md5_original.update(data)
-            fileobj.write(data)
-        fileobj.close()
         self.exit_lock.acquire()
         self.save_manifest_lock.acquire()
-        writer = arv_put.ArvPutUploadJob([filename],
-                                         reporter=fake_reporter,
+        writer = arv_put.ArvPutUploadJob([self.large_file_name],
                                          update_time=0.1)
         # First upload: partially completed with simulated error
         try:
@@ -343,24 +349,26 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
         except SystemExit:
             # Avoid getting a ResumeCacheConflict on the 2nd run
             writer._cache_file.close()
-        self.assertLess(writer.bytes_written, os.path.getsize(filename))
+        self.assertGreater(writer.bytes_written, 0)
+        self.assertLess(writer.bytes_written,
+                        os.path.getsize(self.large_file_name))
 
         # Restore the ArvadosFile.writeto() method to before retrying
-        setattr(arvados.arvfile.ArvadosFile, 'writeto', orig_func)
-        writer_new = arv_put.ArvPutUploadJob([filename])
+        setattr(arvados.arvfile.ArvadosFile, 'writeto', orig_writeto_func)
+        # Restore the ArvPutUploadJob._update() method to before retrying
+        setattr(arv_put.ArvPutUploadJob, '_update', orig_update_func)
+        writer_new = arv_put.ArvPutUploadJob([self.large_file_name])
         writer_new.start()
         writer_new.destroy_cache()
-        self.assertEqual(os.path.getsize(filename),
+        self.assertEqual(os.path.getsize(self.large_file_name),
                          writer.bytes_written + writer_new.bytes_written)
         # Read the uploaded file to compare its md5 hash
         md5_uploaded = hashlib.md5()
         c = arvados.collection.Collection(writer_new.manifest_text())
-        with c.open(os.path.basename(filename), 'r') as f:
+        with c.open(os.path.basename(self.large_file_name), 'r') as f:
             new_data = f.read()
             md5_uploaded.update(new_data)
         self.assertEqual(md5_original.hexdigest(), md5_uploaded.hexdigest())
-        # Cleaning up
-        os.unlink(filename)
 
 
 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):

commit 2b52a4885952a8a3eed01b03af33210fc86d6ce5
Author: Lucas Di Pentima <lucas at curoverse.com>
Date:   Tue Jul 26 17:55:29 2016 -0300

    9463: Added logging messages to warn the user about anomalous cases

diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index eb8199c..f076773 100644
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -21,6 +21,7 @@ import sys
 import tempfile
 import threading
 import copy
+import logging
 from apiclient import errors as apiclient_errors
 
 import arvados.commands._util as arv_cmd
@@ -312,6 +313,7 @@ class ArvPutUploadJob(object):
         self._stop_checkpointer = threading.Event()
         self._checkpointer = threading.Thread(target=self._update_task)
         self._update_task_time = update_time  # How many seconds wait between update runs
+        self.logger = logging.getLogger('arvados.arv_put')
         # Load cached data if any and if needed
         self._setup_state()
 
@@ -451,7 +453,7 @@ class ArvPutUploadJob(object):
                         resume_offset = file_in_collection.size()
                     else:
                         # Inconsistent cache, re-upload the file
-                        pass # TODO: log warning message
+                        self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
                 else:
                     # Local file differs from cached data, re-upload it
                     pass
@@ -551,6 +553,7 @@ class ArvPutUploadJob(object):
             os.fsync(new_cache)
             os.rename(new_cache_name, self._cache_filename)
         except (IOError, OSError, ResumeCacheConflict) as error:
+            self.logger.error("There was a problem while saving the cache file: {}".format(error))
             try:
                 os.unlink(new_cache_name)
             except NameError:  # mkstemp failed.

commit 05e30c2874cfee6e448de38254d4eb6007abb1cd
Author: Lucas Di Pentima <lucas at curoverse.com>
Date:   Tue Jul 26 16:45:29 2016 -0300

    9463: Optimizations on _write_ffile()

diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index 7443dc2..eb8199c 100644
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -422,7 +422,6 @@ class ArvPutUploadJob(object):
 
     def _write_file(self, source, filename):
         resume_offset = 0
-        resume_upload = False
         if self.resume:
             # Check if file was already uploaded (at least partially)
             with self._collection_lock:
@@ -433,38 +432,38 @@ class ArvPutUploadJob(object):
                     file_in_collection = None
             # If no previous cached data on this file, store it for an eventual
             # repeated run.
-            if source not in self._state['files'].keys():
+            if source not in self._state['files']:
                 with self._state_lock:
                     self._state['files'][source] = {
-                        'mtime' : os.path.getmtime(source),
+                        'mtime': os.path.getmtime(source),
                         'size' : os.path.getsize(source)
                     }
             cached_file_data = self._state['files'][source]
             # See if this file was already uploaded at least partially
             if file_in_collection:
                 if cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
-                    if os.path.getsize(source) == file_in_collection.size():
+                    if cached_file_data['size'] == file_in_collection.size():
                         # File already there, skip it.
-                        self.bytes_skipped += os.path.getsize(source)
+                        self.bytes_skipped += cached_file_data['size']
                         return
-                    elif os.path.getsize(source) > file_in_collection.size():
+                    elif cached_file_data['size'] > file_in_collection.size():
                         # File partially uploaded, resume!
-                        resume_upload = True
                         resume_offset = file_in_collection.size()
                     else:
                         # Inconsistent cache, re-upload the file
-                        pass
+                        pass # TODO: log warning message
                 else:
                     # Local file differs from cached data, re-upload it
                     pass
         with open(source, 'r') as source_fd:
-            if self.resume and resume_upload:
+            if resume_offset > 0:
+                # Start upload where we left off
                 with self._collection_lock:
-                    # Open for appending
                     output = self._my_collection().open(filename, 'a')
                 source_fd.seek(resume_offset)
                 self.bytes_skipped += resume_offset
             else:
+                # Start from scratch
                 with self._collection_lock:
                     output = self._my_collection().open(filename, 'w')
             self._write(source_fd, output)

commit bca6c8e4c3d880955d19f7b6ff50bc3fbc31146c
Author: Lucas Di Pentima <lucas at curoverse.com>
Date:   Tue Jul 26 14:46:09 2016 -0300

    9463: Use a constant as a template for initializing the empty cache

diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index 31cb0cb..7443dc2 100644
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -20,6 +20,7 @@ import socket
 import sys
 import tempfile
 import threading
+import copy
 from apiclient import errors as apiclient_errors
 
 import arvados.commands._util as arv_cmd
@@ -280,6 +281,10 @@ class ResumeCache(object):
 
 class ArvPutUploadJob(object):
     CACHE_DIR = '.cache/arvados/arv-put'
+    EMPTY_STATE = {
+        'manifest' : None, # Last saved manifest checkpoint
+        'files' : {} # Previous run file list: {path : {size, mtime}}
+    }
 
     def __init__(self, paths, resume=True, reporter=None, bytes_expected=None,
                  name=None, owner_uuid=None, ensure_unique_name=False,
@@ -511,27 +516,19 @@ class ArvPutUploadJob(object):
             with self._state_lock:
                 try:
                     self._state = json.load(self._cache_file)
-                    if not 'manifest' in self._state.keys():
-                        self._state['manifest'] = ""
-                    if not 'files' in self._state.keys():
-                        self._state['files'] = {}
+                    if not set(['manifest', 'files']).issubset(set(self._state.keys())):
+                        # Cache at least partially incomplete, set up new cache
+                        self._state = copy.deepcopy(self.EMPTY_STATE)
                 except ValueError:
-                    # File empty, set up new cache
-                    self._state = {
-                        'manifest' : None,
-                        # Previous run file list: {path : {size, mtime}}
-                        'files' : {}
-                    }
+                    # Cache file empty, set up new cache
+                    self._state = copy.deepcopy(self.EMPTY_STATE)
             # Load how many bytes were uploaded on previous run
             with self._collection_lock:
                 self.bytes_written = self._collection_size(self._my_collection())
         # No resume required
         else:
             with self._state_lock:
-                self._state = {
-                    'manifest' : None,
-                    'files' : {} # Previous run file list: {path : {size, mtime}}
-                }
+                self._state = copy.deepcopy(self.EMPTY_STATE)
 
     def _lock_file(self, fileobj):
         try:

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list