[ARVADOS] updated: 20f65ff9a7c6c6f73b152d122b70fb44ea5a21a4
git at public.curoverse.com
git at public.curoverse.com
Thu Jan 1 21:30:02 EST 2015
Summary of changes:
sdk/python/arvados/arvfile.py | 12 +++++++-----
sdk/python/arvados/collection.py | 10 +++++++---
sdk/python/arvados/keep.py | 19 +++++++++++++------
3 files changed, 27 insertions(+), 14 deletions(-)
via 20f65ff9a7c6c6f73b152d122b70fb44ea5a21a4 (commit)
from 68cd78ca1a20d80e0ab90d125df305f30b606f85 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 20f65ff9a7c6c6f73b152d122b70fb44ea5a21a4
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Thu Jan 1 21:31:25 2015 -0500
3198: KeepClient creates a requests session to re-use connections. export_manifest correctly strips hints when portable_manifest=True.
diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index 61ba3e0..5c6916e 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -245,6 +245,9 @@ class AsyncKeepWriteErrors(Exception):
def __init__(self, errors):
self.errors = errors
+ def __repr__(self):
+ return "\n".join(self.errors)
+
class BlockManager(object):
def __init__(self, keep):
self._keep = keep
@@ -299,12 +302,11 @@ class BlockManager(object):
self._put_queue.task_done()
if self._put_threads is None:
- self._put_queue = Queue.Queue()
+ self._put_queue = Queue.Queue(maxsize=2)
self._put_errors = Queue.Queue()
- self._put_threads = [threading.Thread(target=worker, args=(self,)),
- threading.Thread(target=worker, args=(self,))]
- self._put_threads[0].start()
- self._put_threads[1].start()
+ self._put_threads = [threading.Thread(target=worker, args=(self,)), threading.Thread(target=worker, args=(self,))]
+ for t in self._put_threads:
+ t.start()
block.state = BufferBlock.PENDING
self._put_queue.put(block)
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index 5857fb2..b07a643 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -922,14 +922,14 @@ class Collection(CollectionBase):
self.set_unmodified()
@_populate_first
- def save_as(self, name, owner_uuid=None):
+ def save_as(self, name, owner_uuid=None, ensure_unique_name=False):
self._my_block_manager().commit_all()
self._my_keep().put(self.manifest_text(strip=True))
body = {"manifest_text": self.manifest_text(strip=False),
"name": name}
if owner_uuid:
body["owner_uuid"] = owner_uuid
- self._api_response = self._my_api().collections().create(body=body).execute(num_retries=self.num_retries)
+ self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=self.num_retries)
self._manifest_locator = self._api_response["uuid"]
self.set_unmodified()
@@ -1002,6 +1002,8 @@ def export_manifest(item, stream_name=".", portable_locators=False):
loc = s.locator
if loc.startswith("bufferblock"):
loc = v.parent._my_block_manager()._bufferblocks[loc].locator()
+ if portable_locators:
+ loc = KeepLocator(loc).stripped()
st.append(LocatorAndRange(loc, locator_block_size(loc),
s.segment_offset, s.range_size))
stream[k] = st
@@ -1009,13 +1011,15 @@ def export_manifest(item, stream_name=".", portable_locators=False):
buf += ' '.join(normalize_stream(stream_name, stream))
buf += "\n"
for k in [s for s in sorted_keys if isinstance(item[s], Collection)]:
- buf += export_manifest(item[k], stream_name=os.path.join(stream_name, k))
+ buf += export_manifest(item[k], stream_name=os.path.join(stream_name, k), portable_locators=portable_locators)
elif isinstance(item, ArvadosFile):
st = []
for s in item._segments:
loc = s.locator
if loc.startswith("bufferblock"):
loc = item._bufferblocks[loc].calculate_locator()
+ if portable_locators:
+ loc = KeepLocator(loc).stripped()
st.append(LocatorAndRange(loc, locator_block_size(loc),
s.segment_offset, s.range_size))
stream[stream_name] = st
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 0c5eac8..7080465 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -58,6 +58,9 @@ class KeepLocator(object):
self.permission_hint()] + self.hints
if s is not None)
+ def stripped(self):
+ return "%s+%i" % (self.md5sum, self.size)
+
def _make_hex_prop(name, length):
# Build and return a new property with the given name that
# must be a hex string of the given length.
@@ -273,10 +276,11 @@ class KeepClient(object):
HTTP_ERRORS = (requests.exceptions.RequestException,
socket.error, ssl.SSLError)
- def __init__(self, root, **headers):
+ def __init__(self, root, session, **headers):
self.root = root
self.last_result = None
self.success_flag = None
+ self.session = session
self.get_headers = {'Accept': 'application/octet-stream'}
self.get_headers.update(headers)
self.put_headers = headers
@@ -299,7 +303,7 @@ class KeepClient(object):
_logger.debug("Request: GET %s", url)
try:
with timer.Timer() as t:
- result = requests.get(url.encode('utf-8'),
+ result = self.session.get(url.encode('utf-8'),
headers=self.get_headers,
timeout=timeout)
except self.HTTP_ERRORS as e:
@@ -325,7 +329,7 @@ class KeepClient(object):
url = self.root + hash_s
_logger.debug("Request: PUT %s", url)
try:
- result = requests.put(url.encode('utf-8'),
+ result = self.session.put(url.encode('utf-8'),
data=body,
headers=self.put_headers,
timeout=timeout)
@@ -365,9 +369,10 @@ class KeepClient(object):
def run_with_limiter(self, limiter):
if self.service.finished():
return
- _logger.debug("KeepWriterThread %s proceeding %s %s",
+ _logger.debug("KeepWriterThread %s proceeding %s+%i %s",
str(threading.current_thread()),
self.args['data_hash'],
+ len(self.args['data']),
self.args['service_root'])
self._success = bool(self.service.put(
self.args['data_hash'],
@@ -376,9 +381,10 @@ class KeepClient(object):
status = self.service.last_status()
if self._success:
result = self.service.last_result
- _logger.debug("KeepWriterThread %s succeeded %s %s",
+ _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
str(threading.current_thread()),
self.args['data_hash'],
+ len(self.args['data']),
self.args['service_root'])
# Tick the 'done' counter for the number of replica
# reported stored by the server, for the case that
@@ -456,6 +462,7 @@ class KeepClient(object):
self.put = self.local_store_put
else:
self.num_retries = num_retries
+ self.session = requests.Session()
if proxy:
if not proxy.endswith('/'):
proxy += '/'
@@ -547,7 +554,7 @@ class KeepClient(object):
local_roots = self.weighted_service_roots(md5_s, force_rebuild)
for root in local_roots:
if root not in roots_map:
- roots_map[root] = self.KeepService(root, **headers)
+ roots_map[root] = self.KeepService(root, self.session, **headers)
return local_roots
@staticmethod
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list