[ARVADOS] updated: 5e1c8439078e25d42e9c339694d0ce8581944870

git at public.curoverse.com git at public.curoverse.com
Thu May 29 16:59:03 EDT 2014


Summary of changes:
 sdk/python/arvados/collection.py     | 23 +++++------
 sdk/python/arvados/commands/put.py   | 56 +++++++++++++++++++--------
 sdk/python/tests/test_arv-put.py     |  9 +----
 sdk/python/tests/test_collections.py | 75 +++++++-----------------------------
 4 files changed, 63 insertions(+), 100 deletions(-)

       via  5e1c8439078e25d42e9c339694d0ce8581944870 (commit)
       via  add055333a1f723e384ec60e0e97592d2d19fb86 (commit)
       via  55aa14a51e1cc96e6e313bf0229b8cb3eb2afff4 (commit)
       via  bcfe58a8b188f04f77eaec2dc11acbfc1c468c53 (commit)
      from  d8f8cdb252b3f66684e1b1f1f1b5cc13622835fd (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 5e1c8439078e25d42e9c339694d0ce8581944870
Author: Brett Smith <brett at curoverse.com>
Date:   Thu May 29 16:36:30 2014 -0400

    2752: arv-put checkpoints after a PUT to Keep.
    
    This allows us to recover after a pulled-the-plug kind of failure, but
    is still noticeably less overhead than checkpointing after each file.

diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index 742d78e..3229125 100644
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -247,6 +247,8 @@ class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
             return writer
 
     def cache_state(self):
+        if self.cache is None:
+            return
         state = self.dump_state()
         # Transform attributes for serialization.
         for attr, value in state.items():
@@ -261,12 +263,14 @@ class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
             self.reporter(self.bytes_written, self.bytes_expected)
 
     def flush_data(self):
-        bytes_buffered = self._data_buffer_len
+        start_buffer_len = self._data_buffer_len
+        start_block_count = self.bytes_written / self.KEEP_BLOCK_SIZE
         super(ArvPutCollectionWriter, self).flush_data()
-        # Checkpoint and report progress if data was PUT to Keep.
-        if self._data_buffer_len < start_buffer_len:
+        if self._data_buffer_len < start_buffer_len:  # We actually PUT data.
             self.bytes_written += (start_buffer_len - self._data_buffer_len)
             self.report_progress()
+            if (self.bytes_written / self.KEEP_BLOCK_SIZE) > start_block_count:
+                self.cache_state()
 
     def _record_new_input(self, input_type, source_name, dest_name):
         # The key needs to be a list because that's what we'll get back

commit add055333a1f723e384ec60e0e97592d2d19fb86
Author: Brett Smith <brett at curoverse.com>
Date:   Thu May 29 16:09:36 2014 -0400

    2752: arv-put handles exit signals.

diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index 912224a..742d78e 100644
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -11,9 +11,12 @@ import fcntl
 import hashlib
 import json
 import os
+import signal
 import sys
 import tempfile
 
+CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
+
 def parse_arguments(arguments):
     parser = argparse.ArgumentParser(
         description='Copy data from the local filesystem to Keep.')
@@ -339,21 +342,29 @@ def main(arguments=None):
 
     writer = ArvPutCollectionWriter.from_cache(
         resume_cache, reporter, expected_bytes_for(args.paths))
+
+    def signal_handler(sigcode, frame):
+        writer.cache_state()
+        sys.exit(-sigcode)
+    # Install our signal handler for each code in CAUGHT_SIGNALS, and save
+    # the originals.
+    orig_signal_handlers = {sigcode: signal.signal(sigcode, signal_handler)
+                            for sigcode in CAUGHT_SIGNALS}
+
     if writer.bytes_written > 0:  # We're resuming a previous upload.
         print >>sys.stderr, "arv-put: Resuming previous upload.  Bypass with the --no-resume option."
         writer.report_progress()
 
     try:
         writer.do_queued_work()  # Do work resumed from cache.
-        # Copy file data to Keep.
-        for path in args.paths:
+        for path in args.paths:  # Copy file data to Keep.
             if os.path.isdir(path):
                 writer.write_directory_tree(
                     path, max_manifest_depth=args.max_manifest_depth)
             else:
                 writer.start_new_stream()
                 writer.write_file(path, args.filename or os.path.basename(path))
-    except (Exception, KeyboardInterrupt):
+    except Exception:
         writer.cache_state()
         raise
 
@@ -373,6 +384,10 @@ def main(arguments=None):
 
         # Print the locator (uuid) of the new collection.
         print writer.finish()
+
+    for sigcode, orig_handler in orig_signal_handlers.items():
+        signal.signal(sigcode, orig_handler)
+
     resume_cache.destroy()
 
 if __name__ == '__main__':

commit 55aa14a51e1cc96e6e313bf0229b8cb3eb2afff4
Author: Brett Smith <brett at curoverse.com>
Date:   Thu May 29 16:36:14 2014 -0400

    2752: Resumed collection writer doesn't do_queued_work immediately.
    
    As noted in the comments, callers that build a writer from resumed
    state must do_queued_work on it before anything else.  But this split
    makes it easier to treat initialization problems and work problems
    separately, which is critical.
    
    This required refactor progress reporting a bit.

diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index 29c44b4..e4c008e 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -171,7 +171,7 @@ class CollectionWriter(object):
     def __exit__(self):
         self.finish()
 
-    def _do_queued_work(self):
+    def do_queued_work(self):
         # The work queue consists of three pieces:
         # * _queued_file: The file object we're currently writing to the
         #   Collection.
@@ -251,12 +251,12 @@ class CollectionWriter(object):
 
     def write_file(self, source, filename=None):
         self._queue_file(source, filename)
-        self._do_queued_work()
+        self.do_queued_work()
 
     def write_directory_tree(self,
                              path, stream_name='.', max_manifest_depth=-1):
         self._queue_tree(path, stream_name, max_manifest_depth)
-        self._do_queued_work()
+        self.do_queued_work()
 
     def write(self, newdata):
         if hasattr(newdata, '__iter__'):
@@ -380,6 +380,12 @@ class ResumableCollectionWriter(CollectionWriter):
 
     @classmethod
     def from_state(cls, state, *init_args, **init_kwargs):
+        # Try to build a new writer from scratch with the given state.
+        # If the state is not suitable to resume (because files have changed,
+        # been deleted, aren't predictable, etc.), raise a
+        # StaleWriterStateError.  Otherwise, return the initialized writer.
+        # The caller is responsible for calling writer.do_queued_work()
+        # appropriately after it's returned.
         writer = cls(*init_args, **init_kwargs)
         for attr_name in cls.STATE_PROPS:
             attr_value = state[attr_name]
@@ -403,13 +409,8 @@ class ResumableCollectionWriter(CollectionWriter):
             except IOError as error:
                 raise errors.StaleWriterStateError(
                     "failed to reopen active file {}: {}".format(path, error))
-        writer.preresume_hook()
-        writer._do_queued_work()
         return writer
 
-    def preresume_hook(self):
-        pass  # Subclasses can override this as desired.
-
     def check_dependencies(self):
         for path, orig_stat in self._dependencies.items():
             if not S_ISREG(orig_stat[ST_MODE]):
diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index 8667026..912224a 100644
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -227,10 +227,7 @@ class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
         self.bytes_written = 0
         self._seen_inputs = []
         self.cache = cache
-        if reporter is None:
-            self.report_progress = lambda bytes_w, bytes_e: None
-        else:
-            self.report_progress = reporter
+        self.reporter = reporter
         self.bytes_expected = bytes_expected
         super(ArvPutCollectionWriter, self).__init__()
 
@@ -246,10 +243,6 @@ class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
         else:
             return writer
 
-    def preresume_hook(self):
-        print >>sys.stderr, "arv-put: Resuming previous upload.  Bypass with the --no-resume option."
-        self.report_progress(self.bytes_written, self.bytes_expected)
-
     def cache_state(self):
         state = self.dump_state()
         # Transform attributes for serialization.
@@ -260,11 +253,17 @@ class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
                 state[attr] = list(value)
         self.cache.save(state)
 
+    def report_progress(self):
+        if self.reporter is not None:
+            self.reporter(self.bytes_written, self.bytes_expected)
+
     def flush_data(self):
         bytes_buffered = self._data_buffer_len
         super(ArvPutCollectionWriter, self).flush_data()
-        self.bytes_written += (bytes_buffered - self._data_buffer_len)
-        self.report_progress(self.bytes_written, self.bytes_expected)
+        # Checkpoint and report progress if data was PUT to Keep.
+        if self._data_buffer_len < start_buffer_len:
+            self.bytes_written += (start_buffer_len - self._data_buffer_len)
+            self.report_progress()
 
     def _record_new_input(self, input_type, source_name, dest_name):
         # The key needs to be a list because that's what we'll get back
@@ -338,10 +337,14 @@ def main(arguments=None):
         print "arv-put: Another process is already uploading this data."
         sys.exit(1)
 
-    try:
-        writer = ArvPutCollectionWriter.from_cache(
-            resume_cache, reporter, expected_bytes_for(args.paths))
+    writer = ArvPutCollectionWriter.from_cache(
+        resume_cache, reporter, expected_bytes_for(args.paths))
+    if writer.bytes_written > 0:  # We're resuming a previous upload.
+        print >>sys.stderr, "arv-put: Resuming previous upload.  Bypass with the --no-resume option."
+        writer.report_progress()
 
+    try:
+        writer.do_queued_work()  # Do work resumed from cache.
         # Copy file data to Keep.
         for path in args.paths:
             if os.path.isdir(path):
diff --git a/sdk/python/tests/test_arv-put.py b/sdk/python/tests/test_arv-put.py
index dde42e6..41fb330 100644
--- a/sdk/python/tests/test_arv-put.py
+++ b/sdk/python/tests/test_arv-put.py
@@ -265,13 +265,8 @@ class ArvadosPutCollectionWriterTest(ArvadosKeepLocalStoreTestCase):
             cwriter.write_file(testfile.name, 'test')
             cwriter.finish_current_stream()
             cwriter.cache_state()
-            # Restore a writer from that state and check its progress report.
-            # We're also checking that progress is reported immediately after
-            # resuming.
-            progression, reporter = self.make_progress_tester()
-            new_writer = arv_put.ArvPutCollectionWriter.from_cache(
-                self.cache, reporter, bytes_expected=4)
-            self.assertIn((4, 4), progression)
+            new_writer = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
+            self.assertEqual(new_writer.bytes_written, 4)
 
 
 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):

commit bcfe58a8b188f04f77eaec2dc11acbfc1c468c53
Author: Brett Smith <brett at curoverse.com>
Date:   Thu May 29 13:53:29 2014 -0400

    2752: Remove unused CollectionWriter checkpoint hook.

diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index f339381..29c44b4 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -194,11 +194,6 @@ class CollectionWriter(object):
                 self._work_trees()
             else:
                 break
-            self.checkpoint_state()
-
-    def checkpoint_state(self):
-        # Subclasses can implement this method to, e.g., report or record state.
-        pass
 
     def _work_file(self):
         while True:
@@ -280,7 +275,6 @@ class CollectionWriter(object):
             self._current_stream_locators += [Keep.put(data_buffer[0:self.KEEP_BLOCK_SIZE])]
             self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
             self._data_buffer_len = len(self._data_buffer[0])
-            self.checkpoint_state()
 
     def start_new_file(self, newfilename=None):
         self.finish_current_file()
diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py
index 4d1e150..d867f7b 100644
--- a/sdk/python/tests/test_collections.py
+++ b/sdk/python/tests/test_collections.py
@@ -16,16 +16,8 @@ from arvados_testutil import ArvadosKeepLocalStoreTestCase
 class TestResumableWriter(arvados.ResumableCollectionWriter):
     KEEP_BLOCK_SIZE = 1024  # PUT to Keep every 1K.
 
-    def __init__(self):
-        self.saved_states = []
-        return super(TestResumableWriter, self).__init__()
-
-    def checkpoint_state(self):
-        self.saved_states.append(self.dump_state(copy.deepcopy))
-
-    def last_state(self):
-        assert self.saved_states, "resumable writer did not save any state"
-        return self.saved_states[-1]
+    def current_state(self):
+        return self.dump_state(copy.deepcopy)
 
 
 class ArvadosCollectionsTest(ArvadosKeepLocalStoreTestCase):
@@ -525,27 +517,11 @@ class ArvadosCollectionsTest(ArvadosKeepLocalStoreTestCase):
             cwriter.manifest_text(),
             ". 902fbdd2b1df0c4f70b4a5d23525e932+3 0:1:A 1:1:B 2:1:C\n")
 
-    def test_checkpoint_after_put(self):
-        cwriter = TestResumableWriter()
-        with self.make_test_file(
-              't' * (cwriter.KEEP_BLOCK_SIZE + 10)) as testfile:
-            testpath = os.path.realpath(testfile.name)
-            cwriter.write_file(testpath, 'test')
-        for state in cwriter.saved_states:
-            if state.get('_current_file') == (testpath,
-                                              cwriter.KEEP_BLOCK_SIZE):
-                break
-        else:
-            self.fail("can't find state immediately after PUT to Keep")
-        self.assertIn('d45107e93f9052fa88a82fc08bb1d316+1024',  # 't' * 1024
-                      state['_current_stream_locators'])
-
     def test_basic_resume(self):
         cwriter = TestResumableWriter()
         with self.make_test_file() as testfile:
             cwriter.write_file(testfile.name, 'test')
-            last_state = cwriter.last_state()
-            resumed = TestResumableWriter.from_state(last_state)
+            resumed = TestResumableWriter.from_state(cwriter.current_state())
         self.assertEquals(cwriter.manifest_text(), resumed.manifest_text(),
                           "resumed CollectionWriter had different manifest")
 
@@ -555,7 +531,7 @@ class ArvadosCollectionsTest(ArvadosKeepLocalStoreTestCase):
             cwriter.write_file(testfile.name, 'test')
         self.assertRaises(arvados.errors.StaleWriterStateError,
                           TestResumableWriter.from_state,
-                          cwriter.last_state())
+                          cwriter.current_state())
 
     def test_resume_fails_when_dependency_mtime_changed(self):
         cwriter = TestResumableWriter()
@@ -564,14 +540,14 @@ class ArvadosCollectionsTest(ArvadosKeepLocalStoreTestCase):
             os.utime(testfile.name, (0, 0))
             self.assertRaises(arvados.errors.StaleWriterStateError,
                               TestResumableWriter.from_state,
-                              cwriter.last_state())
+                              cwriter.current_state())
 
     def test_resume_fails_when_dependency_is_nonfile(self):
         cwriter = TestResumableWriter()
         cwriter.write_file('/dev/null', 'empty')
         self.assertRaises(arvados.errors.StaleWriterStateError,
                           TestResumableWriter.from_state,
-                          cwriter.last_state())
+                          cwriter.current_state())
 
     def test_resume_fails_when_dependency_size_changed(self):
         cwriter = TestResumableWriter()
@@ -583,41 +559,16 @@ class ArvadosCollectionsTest(ArvadosKeepLocalStoreTestCase):
             os.utime(testfile.name, (orig_mtime, orig_mtime))
             self.assertRaises(arvados.errors.StaleWriterStateError,
                               TestResumableWriter.from_state,
-                              cwriter.last_state())
+                              cwriter.current_state())
 
     def test_resume_fails_with_expired_locator(self):
         cwriter = TestResumableWriter()
-        with self.make_test_file() as testfile:
-            cwriter.write_file(testfile.name, 'test')
-            cwriter.finish_current_stream()
-            state = cwriter.last_state()
-            # Get the last locator, remove any permission hint, and add
-            # an expired one.
-            new_loc = state['_current_stream_locators'][-1].split('+A', 1)[0]
-            state['_current_stream_locators'][-1] = "{}+A{}@10000000".format(
-                new_loc, 'a' * 40)
-            self.assertRaises(arvados.errors.StaleWriterStateError,
-                              TestResumableWriter.from_state, state)
-
-    def test_successful_resumes(self):
-        # FIXME: This is more of an integration test than a unit test.
-        cwriter = TestResumableWriter()
-        source_tree = self.build_directory_tree(['basefile', 'subdir/subfile'])
-        with open(os.path.join(source_tree, 'long'), 'w') as longfile:
-            longfile.write('t' * (cwriter.KEEP_BLOCK_SIZE + 10))
-        cwriter.write_directory_tree(source_tree)
-        # A state for each file, plus a fourth for mid-longfile.
-        self.assertGreater(len(cwriter.saved_states), 3,
-                           "CollectionWriter didn't save enough states to test")
-
-        for state in cwriter.saved_states:
-            new_writer = TestResumableWriter.from_state(state)
-            manifests = [writer.manifest_text()
-                         for writer in (cwriter, new_writer)]
-            self.assertEquals(
-                manifests[0], manifests[1],
-                "\n".join(["manifest mismatch after resuming from state:",
-                           pprint.pformat(state), ""] + manifests))
+        state = cwriter.current_state()
+        # Add an expired locator to the state.
+        state['_current_stream_locators'].append(''.join([
+                    'a' * 32, '+A', 'b' * 40, '@', '10000000']))
+        self.assertRaises(arvados.errors.StaleWriterStateError,
+                          TestResumableWriter.from_state, state)
 
     def test_arbitrary_objects_not_resumable(self):
         cwriter = TestResumableWriter()

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list