[ARVADOS] created: 92ce25eae546140016b2c1acb9b7f4891ee34387
git at public.curoverse.com
git at public.curoverse.com
Tue Sep 22 11:59:51 EDT 2015
at 92ce25eae546140016b2c1acb9b7f4891ee34387 (commit)
commit 92ce25eae546140016b2c1acb9b7f4891ee34387
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Tue Sep 22 11:47:25 2015 -0400
7225: Fix typo in _BufferBlock.set_state() that resulted in "wait for pending
block commit" code getting skipped. Add stop_threads() to commit_all() to shut
down Put and Get worker threads.
diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index 106f7a7..9ed97c6 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -320,7 +320,7 @@ class _BufferBlock(object):
@synchronized
def set_state(self, nextstate, val=None):
if (self._state, nextstate) not in self.STATE_TRANSITIONS:
- raise StateChangeError("Invalid state change from %s to %s" % (self.state, nextstate), self.state, nextstate)
+ raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
self._state = nextstate
if self._state == _BufferBlock.PENDING:
@@ -617,29 +617,32 @@ class _BlockManager(object):
are uploaded. Raises KeepWriteError() if any blocks failed to upload.
"""
- with self.lock:
- items = self._bufferblocks.items()
+ try:
+ with self.lock:
+ items = self._bufferblocks.items()
- for k,v in items:
- if v.state() != _BufferBlock.COMMITTED:
- v.owner.flush(sync=False)
+ for k,v in items:
+ if v.state() != _BufferBlock.COMMITTED:
+ v.owner.flush(sync=False)
- with self.lock:
- if self._put_queue is not None:
- self._put_queue.join()
-
- err = []
- for k,v in items:
- if v.state() == _BufferBlock.ERROR:
- err.append((v.locator(), v.error))
- if err:
- raise KeepWriteError("Error writing some blocks", err, label="block")
-
- for k,v in items:
- # flush again with sync=True to remove committed bufferblocks from
- # the segments.
- if v.owner:
- v.owner.flush(sync=True)
+ with self.lock:
+ if self._put_queue is not None:
+ self._put_queue.join()
+
+ err = []
+ for k,v in items:
+ if v.state() == _BufferBlock.ERROR:
+ err.append((v.locator(), v.error))
+ if err:
+ raise KeepWriteError("Error writing some blocks", err, label="block")
+
+ for k,v in items:
+ # flush again with sync=True to remove committed bufferblocks from
+ # the segments.
+ if v.owner:
+ v.owner.flush(sync=True)
+ finally:
+ self.stop_threads()
def block_prefetch(self, locator):
"""Initiate a background download of a block.
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list