[ARVADOS] created: 92ce25eae546140016b2c1acb9b7f4891ee34387

git at public.curoverse.com git at public.curoverse.com
Tue Sep 22 11:59:51 EDT 2015


        at  92ce25eae546140016b2c1acb9b7f4891ee34387 (commit)


commit 92ce25eae546140016b2c1acb9b7f4891ee34387
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Tue Sep 22 11:47:25 2015 -0400

    7225: Fix typo in _BufferBlock.set_state() that resulted in "wait for pending
    block commit" code getting skipped.  Add stop_threads() to commit_all() to shut
    down Put and Get worker threads.

diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index 106f7a7..9ed97c6 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -320,7 +320,7 @@ class _BufferBlock(object):
     @synchronized
     def set_state(self, nextstate, val=None):
         if (self._state, nextstate) not in self.STATE_TRANSITIONS:
-            raise StateChangeError("Invalid state change from %s to %s" % (self.state, nextstate), self.state, nextstate)
+            raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
         self._state = nextstate
 
         if self._state == _BufferBlock.PENDING:
@@ -617,29 +617,32 @@ class _BlockManager(object):
         are uploaded.  Raises KeepWriteError() if any blocks failed to upload.
 
         """
-        with self.lock:
-            items = self._bufferblocks.items()
+        try:
+            with self.lock:
+                items = self._bufferblocks.items()
 
-        for k,v in items:
-            if v.state() != _BufferBlock.COMMITTED:
-                v.owner.flush(sync=False)
+            for k,v in items:
+                if v.state() != _BufferBlock.COMMITTED:
+                    v.owner.flush(sync=False)
 
-        with self.lock:
-            if self._put_queue is not None:
-                self._put_queue.join()
-
-                err = []
-                for k,v in items:
-                    if v.state() == _BufferBlock.ERROR:
-                        err.append((v.locator(), v.error))
-                if err:
-                    raise KeepWriteError("Error writing some blocks", err, label="block")
-
-        for k,v in items:
-            # flush again with sync=True to remove committed bufferblocks from
-            # the segments.
-            if v.owner:
-                v.owner.flush(sync=True)
+            with self.lock:
+                if self._put_queue is not None:
+                    self._put_queue.join()
+
+                    err = []
+                    for k,v in items:
+                        if v.state() == _BufferBlock.ERROR:
+                            err.append((v.locator(), v.error))
+                    if err:
+                        raise KeepWriteError("Error writing some blocks", err, label="block")
+
+            for k,v in items:
+                # flush again with sync=True to remove committed bufferblocks from
+                # the segments.
+                if v.owner:
+                    v.owner.flush(sync=True)
+        finally:
+            self.stop_threads()
 
     def block_prefetch(self, locator):
         """Initiate a background download of a block.

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list