[ARVADOS] created: 548f183c23606b9febcadbbfd658aa921c3baaf5
git at public.curoverse.com
git at public.curoverse.com
Thu May 15 10:01:54 EDT 2014
at 548f183c23606b9febcadbbfd658aa921c3baaf5 (commit)
commit 548f183c23606b9febcadbbfd658aa921c3baaf5
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Thu May 15 10:01:50 2014 -0400
2751: Added code to recognize keep proxies in the keep_disks() response and set
using_proxy flag. Added support for 'X-Keep-Desired-Replication' and
'X-Keep-Replicas-Stored' HTTP headers. Added ARVADOS_KEEP_PROXY environment
variable.
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 88487ae..c506997 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -111,6 +111,12 @@ class KeepClient(object):
url = self.args['service_root'] + self.args['data_hash']
api_token = config.get('ARVADOS_API_TOKEN')
headers = {'Authorization': "OAuth2 %s" % api_token}
+
+ if self.args['using_proxy']:
+ # We're using a proxy, so tell the proxy how many copies we
+ # want it to store
+ headers['X-Keep-Desired-Replication'] = str(self.args['want_copies'])
+
try:
resp, content = h.request(url.encode('utf-8'), 'PUT',
headers=headers,
@@ -129,7 +135,19 @@ class KeepClient(object):
(str(threading.current_thread()),
self.args['data_hash'],
self.args['service_root']))
- return limiter.increment_done()
+
+ if 'x-keep-replicas-stored' in resp:
+ # Tick the 'done' counter for the number of replica
+ # reported stored by the server, for the case that
+ # we're talking to a proxy or other backend that
+ # stores to multiple copies for us.
+ replicas = int(resp['x-keep-replicas-stored'])
+ while replicas > 0:
+ limiter.increment_done()
+ replicas -= 1
+ else:
+ limiter.increment_done()
+ return
logging.warning("Request fail: PUT %s => %s %s" %
(url, resp['status'], content))
except (httplib2.HttpLib2Error, httplib.HTTPException) as e:
@@ -143,21 +161,35 @@ class KeepClient(object):
self._cache = []
# default 256 megabyte cache
self.cache_max = 256 * 1024 * 1024
+ self.using_proxy = False
def shuffled_service_roots(self, hash):
if self.service_roots == None:
self.lock.acquire()
- try:
- keep_disks = arvados.api().keep_disks().list().execute()['items']
- roots = (("http%s://%s:%d/" %
- ('s' if f['service_ssl_flag'] else '',
- f['service_host'],
- f['service_port']))
- for f in keep_disks)
- self.service_roots = sorted(set(roots))
- logging.debug(str(self.service_roots))
- finally:
- self.lock.release()
+
+ # Override normal keep disk lookup with an explict proxy
+ # configuration.
+ keep_proxy_env = self.config.get("ARVADOS_KEEP_PROXY")
+ if keep_proxy_env != None:
+ if keep_proxy_env[-1:] != '/':
+ keep_proxy_env += "/"
+ self.service_roots = [keep_proxy_env]
+ self.using_proxy = True
+ else:
+ try:
+ keep_disks = arvados.api().keep_disks().list().execute()['items']
+ roots = (("http%s://%s:%d/" %
+ ('s' if f['service_ssl_flag'] else '',
+ f['service_host'],
+ f['service_port']))
+ for f in keep_disks)
+ self.service_roots = sorted(set(roots))
+ if len(keep_disks) == 1 and re.match(r'[a-z0-9]{5}-penuu-keepproxy......', keep_disks[0]['uuid']):
+ # Proxies have a special UUID pattern.
+ self.using_proxy = True
+ logging.debug(str(self.service_roots))
+ finally:
+ self.lock.release()
seed = hash
pool = self.service_roots[:]
@@ -208,7 +240,7 @@ class KeepClient(object):
self._cache_lock.release()
def reserve_cache(self, locator):
- '''Reserve a cache slot for the specified locator,
+ '''Reserve a cache slot for the specified locator,
or return the existing slot.'''
self._cache_lock.acquire()
try:
@@ -281,8 +313,8 @@ class KeepClient(object):
with timer.Timer() as t:
resp, content = h.request(url.encode('utf-8'), 'GET',
headers=headers)
- logging.info("Received %s bytes in %s msec (%s MiB/sec)" % (len(content),
- t.msecs,
+ logging.info("Received %s bytes in %s msec (%s MiB/sec)" % (len(content),
+ t.msecs,
(len(content)/(1024*1024))/t.secs))
if re.match(r'^2\d\d$', resp['status']):
m = hashlib.new('md5')
@@ -312,13 +344,15 @@ class KeepClient(object):
t = KeepClient.KeepWriterThread(data=data,
data_hash=data_hash,
service_root=service_root,
- thread_limiter=thread_limiter)
+ thread_limiter=thread_limiter,
+ using_proxy=self.using_proxy,
+ want_copies=(want_copies if self.using_proxy else 1))
t.start()
threads += [t]
for t in threads:
t.join()
have_copies = thread_limiter.done()
- if have_copies == want_copies:
+ if have_copies >= want_copies:
return (data_hash + '+' + str(len(data)))
raise arvados.errors.KeepWriteError(
"Write fail for %s: wanted %d but wrote %d" %
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list