[ARVADOS] updated: 20b988d5b901f459f95043c3702f1f9b104f3fbf
git at public.curoverse.com
git at public.curoverse.com
Wed Apr 29 10:05:36 EDT 2015
Summary of changes:
sdk/python/arvados/errors.py | 1 -
sdk/python/arvados/keep.py | 21 ++-------------------
sdk/python/arvados/retry.py | 8 ++++----
sdk/python/tests/arvados_testutil.py | 23 +++++++----------------
sdk/python/tests/test_collections.py | 14 +++++++-------
sdk/python/tests/test_keep_client.py | 34 ++++++++++++++--------------------
sdk/python/tests/test_retry.py | 2 --
sdk/python/tests/test_stream.py | 14 +++++++-------
8 files changed, 41 insertions(+), 76 deletions(-)
discards 71ba11571e8fa948247b34914f5ed3cc53b85585 (commit)
discards f406277fa3cc52af9705c24df3d4142c74e28646 (commit)
via 20b988d5b901f459f95043c3702f1f9b104f3fbf (commit)
via 7ab494e42b35b3769a326a16b7e90f1d20147ced (commit)
This update added new revisions after undoing existing revisions. That is
to say, the old revision is not a strict subset of the new revision. This
situation occurs when you --force push a change and generate a repository
containing something like this:
* -- * -- B -- O -- O -- O (71ba11571e8fa948247b34914f5ed3cc53b85585)
\
N -- N -- N (20b988d5b901f459f95043c3702f1f9b104f3fbf)
When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 20b988d5b901f459f95043c3702f1f9b104f3fbf
Author: Tom Clegg <tom at curoverse.com>
Date: Tue Apr 28 19:08:45 2015 -0400
5562: Add fake keepstore server with configurable problems.
diff --git a/sdk/python/tests/keepstub.py b/sdk/python/tests/keepstub.py
new file mode 100644
index 0000000..e84230a
--- /dev/null
+++ b/sdk/python/tests/keepstub.py
@@ -0,0 +1,95 @@
+import BaseHTTPServer
+import hashlib
+import os
+import re
+import SocketServer
+import time
+
+class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer, object):
+
+ allow_reuse_address = 1
+
+ def __init__(self, *args, **kwargs):
+ self.store = {}
+ self.delays = {
+ # before reading request headers
+ 'request': 0,
+ # before reading request body
+ 'request_body': 0,
+ # before setting response status and headers
+ 'response': 0,
+ # before sending response body
+ 'response_body': 0,
+ # before returning from handler (thus setting response EOF)
+ 'response_close': 0,
+ }
+ super(Server, self).__init__(*args, **kwargs)
+
+ def setdelays(self, **kwargs):
+ """In future requests, induce delays at the given checkpoints."""
+ for (k, v) in kwargs.iteritems():
+ self.delays.get(k) # NameError if unknown key
+ self.delays[k] = v
+
+ def _sleep_at_least(self, seconds):
+ """Sleep for given time, even if signals are received."""
+ wake = time.time() + seconds
+ todo = seconds
+ while todo > 0:
+ time.sleep(todo)
+ todo = wake - time.time()
+
+ def _do_delay(self, k):
+ self._sleep_at_least(self.delays[k])
+
+
+class Handler(BaseHTTPServer.BaseHTTPRequestHandler, object):
+ def handle(self, *args, **kwargs):
+ self.server._do_delay('request')
+ return super(Handler, self).handle(*args, **kwargs)
+
+ def do_GET(self):
+ self.server._do_delay('response')
+ r = re.search(r'[0-9a-f]{32}', self.path)
+ if not r:
+ return self.send_response(422)
+ datahash = r.group(0)
+ if datahash not in self.server.store:
+ return self.send_response(404)
+ self.send_response(200)
+ self.send_header('Content-type', 'application/octet-stream')
+ self.end_headers()
+ self.server._do_delay('response_body')
+ self.wfile.write(self.server.store[datahash])
+ self.server._do_delay('response_close')
+
+ def do_PUT(self):
+ self.server._do_delay('request_body')
+ data = self.rfile.read(int(self.headers.getheader('content-length')))
+ datahash = hashlib.md5(data).hexdigest()
+ self.server.store[datahash] = data
+ self.server._do_delay('response')
+ self.send_response(200)
+ self.send_header('Content-type', 'text/plain')
+ self.end_headers()
+ self.server._do_delay('response_body')
+ self.wfile.write(datahash + '+' + str(len(data)))
+ self.server._do_delay('response_close')
+
+ def log_request(self, *args, **kwargs):
+ if os.environ.get('ARVADOS_DEBUG', None):
+ super(Handler, self).log_request(*args, **kwargs)
+
+ def finish(self, *args, **kwargs):
+ """Ignore exceptions, notably "Broken pipe" when client times out."""
+ try:
+ return super(Handler, self).finish(*args, **kwargs)
+ except:
+ pass
+
+ def handle_one_request(self, *args, **kwargs):
+ """Ignore exceptions, notably "Broken pipe" when client times out."""
+ try:
+ return super(Handler, self).handle_one_request(*args, **kwargs)
+ except:
+ pass
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index 0c42c2f..419f1ce 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -5,12 +5,15 @@ import pycurl
import random
import re
import socket
+import threading
+import time
import unittest
import urlparse
import arvados
import arvados.retry
import arvados_testutil as tutil
+import keepstub
import run_test_server
class KeepTestCase(run_test_server.TestCaseWithServers):
@@ -268,9 +271,10 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
self.assertEqual('100::1', service.hostname)
self.assertEqual(10, service.port)
- # test_get_timeout and test_put_timeout test that
- # KeepClient.get and KeepClient.put use the appropriate timeouts
- # when connected directly to a Keep server (i.e. non-proxy timeout)
+ # test_*_timeout verify that KeepClient instructs pycurl to use
+ # the appropriate connection and read timeouts. They don't care
+ # whether pycurl actually exhibits the expected timeout behavior
+ # -- those tests are in the KeepClientTimeout test class.
def test_get_timeout(self):
api_client = self.mock_keep_services(count=1)
@@ -460,6 +464,91 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
self.assertEqual(2, len(exc_check.exception.request_errors()))
+class KeepClientTimeout(unittest.TestCase, tutil.ApiClientMock):
+ DATA = 'x' * 2**10
+
+ class assertTakesBetween(unittest.TestCase):
+ def __init__(self, tmin, tmax):
+ self.tmin = tmin
+ self.tmax = tmax
+
+ def __enter__(self):
+ self.t0 = time.time()
+
+ def __exit__(self, *args, **kwargs):
+ self.assertGreater(time.time() - self.t0, self.tmin)
+ self.assertLess(time.time() - self.t0, self.tmax)
+
+ def setUp(self):
+ sock = socket.socket()
+ sock.bind(('0.0.0.0', 0))
+ self.port = sock.getsockname()[1]
+ sock.close()
+ self.server = keepstub.Server(('0.0.0.0', self.port), keepstub.Handler)
+ self.thread = threading.Thread(target=self.server.serve_forever)
+ self.thread.daemon = True # Exit thread if main proc exits
+ self.thread.start()
+ self.api_client = self.mock_keep_services(
+ count=1,
+ service_host='localhost',
+ service_port=self.port,
+ )
+
+ def tearDown(self):
+ self.server.shutdown()
+
+ def keepClient(self, timeouts=(0.1, 1.0)):
+ return arvados.KeepClient(
+ api_client=self.api_client,
+ timeout=timeouts)
+
+ def test_timeout_slow_connect(self):
+ # Can't simulate TCP delays with our own socket. Leave our
+ # stub server running uselessly, and try to connect to an
+ # unroutable IP address instead.
+ self.api_client = self.mock_keep_services(
+ count=1,
+ service_host='240.0.0.0',
+ )
+ with self.assertTakesBetween(0.1, 0.5):
+ with self.assertRaises(arvados.errors.KeepWriteError):
+ self.keepClient((0.1, 1)).put(self.DATA, copies=1, num_retries=0)
+
+ def test_timeout_slow_request(self):
+ self.server.setdelays(request=0.2)
+ self._test_200ms()
+
+ def test_timeout_slow_response(self):
+ self.server.setdelays(response=0.2)
+ self._test_200ms()
+
+ def test_timeout_slow_response_body(self):
+ self.server.setdelays(response_body=0.2)
+ self._test_200ms()
+
+ def _test_200ms(self):
+ """Connect should be t<100ms, request should be 200ms <= t < 300ms"""
+
+ # Allow 100ms to connect, then 1s for response. Everything
+ # should work, and everything should take at least 200ms to
+ # return.
+ kc = self.keepClient((.1, 1))
+ with self.assertTakesBetween(.2, .3):
+ loc = kc.put(self.DATA, copies=1, num_retries=0)
+ with self.assertTakesBetween(.2, .3):
+ self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
+
+ # Allow 1s to connect, then 100ms for response. Nothing should
+ # work, and everything should take at least 100ms to return.
+ kc = self.keepClient((1, .1))
+ with self.assertTakesBetween(.1, .2):
+ with self.assertRaises(arvados.errors.KeepReadError):
+ kc.get(loc, num_retries=0)
+ with self.assertTakesBetween(.1, .2):
+ with self.assertRaises(arvados.errors.KeepWriteError):
+ kc.put(self.DATA, copies=1, num_retries=0)
+
+
class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
def mock_disks_and_gateways(self, disks=3, gateways=1):
self.gateways = [{
commit 7ab494e42b35b3769a326a16b7e90f1d20147ced
Author: Tom Clegg <tom at curoverse.com>
Date: Wed Apr 22 14:26:43 2015 -0400
5562: Use pycurl library (instead of requests) for Keep transactions.
diff --git a/sdk/python/arvados/errors.py b/sdk/python/arvados/errors.py
index 3629520..bfd471b 100644
--- a/sdk/python/arvados/errors.py
+++ b/sdk/python/arvados/errors.py
@@ -1,7 +1,6 @@
# errors.py - Arvados-specific exceptions.
import json
-import requests
from apiclient import errors as apiclient_errors
from collections import OrderedDict
@@ -46,7 +45,7 @@ class KeepRequestError(Exception):
self.message = message
def _format_error(self, key, error):
- if isinstance(error, requests.Response):
+ if isinstance(error, HttpError):
err_fmt = "{} {} responded with {e.status_code} {e.reason}"
else:
err_fmt = "{} {} raised {e.__class__.__name__} ({e})"
@@ -61,6 +60,12 @@ class KeepRequestError(Exception):
return self._request_errors
+class HttpError(Exception):
+ def __init__(self, status_code, reason):
+ self.status_code = status_code
+ self.reason = reason
+
+
class ArgumentError(Exception):
pass
class SyntaxError(Exception):
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 842a36d..b26285e 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -1,25 +1,28 @@
+import bz2
+import datetime
+import fcntl
+import functools
import gflags
+import hashlib
+import json
import logging
import os
import pprint
-import sys
-import types
-import subprocess
-import json
-import UserDict
+import pycurl
+import Queue
import re
-import hashlib
+import socket
+import ssl
import string
-import bz2
-import zlib
-import fcntl
-import time
+import StringIO
+import subprocess
+import sys
import threading
+import time
import timer
-import datetime
-import ssl
-import socket
-import requests
+import types
+import UserDict
+import zlib
import arvados
import arvados.config as config
@@ -27,25 +30,10 @@ import arvados.errors
import arvados.retry as retry
import arvados.util
-try:
- # Workaround for urllib3 bug.
- # The 'requests' library enables urllib3's SNI support by default, which uses pyopenssl.
- # However, urllib3 prior to version 1.10 has a major bug in this feature
- # (OpenSSL WantWriteError, https://github.com/shazow/urllib3/issues/412)
- # Unfortunately Debian 8 is stabilizing on urllib3 1.9.1 which means the
- # following workaround is necessary to be able to use
- # the arvados python sdk with the distribution-provided packages.
- import urllib3
- from pkg_resources import parse_version
- if parse_version(urllib3.__version__) < parse_version('1.10'):
- from urllib3.contrib import pyopenssl
- pyopenssl.extract_from_urllib3()
-except ImportError:
- pass
-
_logger = logging.getLogger('arvados.keep')
global_client_object = None
+
class KeepLocator(object):
EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9 at _-]+$')
@@ -301,75 +289,189 @@ class KeepClient(object):
class KeepService(object):
- # Make requests to a single Keep service, and track results.
- HTTP_ERRORS = (requests.exceptions.RequestException,
- socket.error, ssl.SSLError)
+ """Make requests to a single Keep service, and track results.
+
+ A KeepService is intended to last long enough to perform one
+ transaction (GET or PUT) against one Keep service. This can
+ involve calling either get() or put() multiple times in order
+ to retry after transient failures. However, calling both get()
+ and put() on a single instance -- or using the same instance
+ to access two different Keep services -- will not produce
+ sensible behavior.
+ """
- def __init__(self, root, session, **headers):
+ HTTP_ERRORS = (
+ socket.error,
+ ssl.SSLError,
+ arvados.errors.HttpError,
+ )
+
+ def __init__(self, root, user_agent_pool=Queue.LifoQueue(), **headers):
self.root = root
- self.last_result = None
- self.success_flag = None
- self.session = session
+ self._user_agent_pool = user_agent_pool
+ self._result = {'error': None}
+ self._usable = True
+ self._session = None
self.get_headers = {'Accept': 'application/octet-stream'}
self.get_headers.update(headers)
self.put_headers = headers
def usable(self):
- return self.success_flag is not False
+ """Is it worth attempting a request?"""
+ return self._usable
def finished(self):
- return self.success_flag is not None
+ """Did the request succeed or encounter permanent failure?"""
+ return self._result['error'] == False or not self._usable
+
+ def last_result(self):
+ return self._result
- def last_status(self):
+ def _get_user_agent(self):
try:
- return self.last_result.status_code
- except AttributeError:
- return None
+ return self._user_agent_pool.get(False)
+ except Queue.Empty:
+ return pycurl.Curl()
+
+ def _put_user_agent(self, ua):
+ try:
+ ua.reset()
+ self._user_agent_pool.put(ua, False)
+ except:
+ ua.close()
def get(self, locator, timeout=None):
# locator is a KeepLocator object.
url = self.root + str(locator)
_logger.debug("Request: GET %s", url)
+ curl = self._get_user_agent()
try:
with timer.Timer() as t:
- result = self.session.get(url.encode('utf-8'),
- headers=self.get_headers,
- timeout=timeout)
+ self._headers = {}
+ response_body = StringIO.StringIO()
+ curl.setopt(pycurl.NOSIGNAL, 1)
+ curl.setopt(pycurl.URL, url.encode('utf-8'))
+ curl.setopt(pycurl.HTTPHEADER, [
+ '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
+ curl.setopt(pycurl.WRITEDATA, response_body)
+ curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
+ self._setcurltimeouts(curl, timeout)
+ try:
+ curl.perform()
+ except Exception as e:
+ raise arvados.errors.HttpError(0, str(e))
+ self._result = {
+ 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
+ 'body': response_body.getvalue(),
+ 'headers': self._headers,
+ 'error': False,
+ }
+ ok = retry.check_http_response_success(self._result['status_code'])
+ if not ok:
+ self._result['error'] = arvados.errors.HttpError(
+ self._result['status_code'],
+ self._headers.get('x-status-line', 'Error'))
except self.HTTP_ERRORS as e:
+ self._result = {
+ 'error': e,
+ }
+ ok = False
+ self._usable = ok != False
+ if not ok:
_logger.debug("Request fail: GET %s => %s: %s",
- url, type(e), str(e))
- self.last_result = e
- else:
- self.last_result = result
- self.success_flag = retry.check_http_response_success(result)
- content = result.content
- _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
- self.last_status(), len(content), t.msecs,
- (len(content)/(1024.0*1024))/t.secs if t.secs > 0 else 0)
- if self.success_flag:
- resp_md5 = hashlib.md5(content).hexdigest()
- if resp_md5 == locator.md5sum:
- return content
- _logger.warning("Checksum fail: md5(%s) = %s",
- url, resp_md5)
- return None
+ url, type(self._result['error']), str(self._result['error']))
+ # Don't return this ua to the pool, in case it's broken.
+ curl.close()
+ return None
+ self._put_user_agent(curl)
+ _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
+ self._result['status_code'],
+ len(self._result['body']),
+ t.msecs,
+ (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
+ resp_md5 = hashlib.md5(self._result['body']).hexdigest()
+ if resp_md5 != locator.md5sum:
+ _logger.warning("Checksum fail: md5(%s) = %s",
+ url, resp_md5)
+ self._result['error'] = arvados.errors.HttpError(
+ 0, 'Checksum fail')
+ return None
+ return self._result['body']
def put(self, hash_s, body, timeout=None):
url = self.root + hash_s
_logger.debug("Request: PUT %s", url)
+ curl = self._get_user_agent()
try:
- result = self.session.put(url.encode('utf-8'),
- data=body,
- headers=self.put_headers,
- timeout=timeout)
+ self._headers = {}
+ response_body = StringIO.StringIO()
+ curl.setopt(pycurl.NOSIGNAL, 1)
+ curl.setopt(pycurl.URL, url.encode('utf-8'))
+ curl.setopt(pycurl.POSTFIELDS, body)
+ curl.setopt(pycurl.CUSTOMREQUEST, 'PUT')
+ curl.setopt(pycurl.HTTPHEADER, [
+ '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
+ curl.setopt(pycurl.WRITEDATA, response_body)
+ curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
+ self._setcurltimeouts(curl, timeout)
+ try:
+ curl.perform()
+ except Exception as e:
+ raise arvados.errors.HttpError(0, str(e))
+ self._result = {
+ 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
+ 'body': response_body.getvalue(),
+ 'headers': self._headers,
+ 'error': False,
+ }
+ ok = retry.check_http_response_success(self._result['status_code'])
+ if not ok:
+ self._result['error'] = arvados.errors.HttpError(
+ self._result['status_code'],
+ self._headers.get('x-status-line', 'Error'))
except self.HTTP_ERRORS as e:
+ self._result = {
+ 'error': e,
+ }
+ ok = False
+ self._usable = ok != False # still usable if ok is True or None
+ if not ok:
_logger.debug("Request fail: PUT %s => %s: %s",
- url, type(e), str(e))
- self.last_result = e
+ url, type(self._result['error']), str(self._result['error']))
+ # Don't return this ua to the pool, in case it's broken.
+ curl.close()
+ return False
+ self._put_user_agent(curl)
+ return True
+
+ def _setcurltimeouts(self, curl, timeouts):
+ if not timeouts:
+ return
+ elif isinstance(timeouts, tuple):
+ conn_t, xfer_t = timeouts
+ else:
+ conn_t, xfer_t = (timeouts, timeouts)
+ curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
+ curl.setopt(pycurl.TIMEOUT_MS, int(xfer_t*1000))
+
+ def _headerfunction(self, header_line):
+ header_line = header_line.decode('iso-8859-1')
+ if ':' in header_line:
+ name, value = header_line.split(':', 1)
+ name = name.strip().lower()
+ value = value.strip()
+ elif self._headers:
+ name = self._lastheadername
+ value = self._headers[name] + ' ' + header_line.strip()
+ elif header_line.startswith('HTTP/'):
+ name = 'x-status-line'
+ value = header_line
else:
- self.last_result = result
- self.success_flag = retry.check_http_response_success(result)
- return self.success_flag
+ _logger.error("Unexpected header line: %s", header_line)
+ return
+ self._lastheadername = name
+ self._headers[name] = value
+ # Returning None implies all bytes were written
class KeepWriterThread(threading.Thread):
@@ -407,9 +509,8 @@ class KeepClient(object):
self.args['data_hash'],
self.args['data'],
timeout=self.args.get('timeout', None)))
- status = self.service.last_status()
+ result = self.service.last_result()
if self._success:
- result = self.service.last_result
_logger.debug("KeepWriterThread %s succeeded %s+%i %s",
str(threading.current_thread()),
self.args['data_hash'],
@@ -420,14 +521,15 @@ class KeepClient(object):
# we're talking to a proxy or other backend that
# stores to multiple copies for us.
try:
- replicas_stored = int(result.headers['x-keep-replicas-stored'])
+ replicas_stored = int(result['headers']['x-keep-replicas-stored'])
except (KeyError, ValueError):
replicas_stored = 1
- limiter.save_response(result.content.strip(), replicas_stored)
- elif status is not None:
+ limiter.save_response(result['body'].strip(), replicas_stored)
+ elif result.get('status_code', None):
_logger.debug("Request fail: PUT %s => %s %s",
- self.args['data_hash'], status,
- self.service.last_result.content)
+ self.args['data_hash'],
+ result['status_code'],
+ result['body'])
def __init__(self, api_client=None, proxy=None,
@@ -484,10 +586,6 @@ class KeepClient(object):
The default number of times to retry failed requests.
This will be used as the default num_retries value when get() and
put() are called. Default 0.
-
- :session:
- The requests.Session object to use for get() and put() requests.
- Will create one if not specified.
"""
self.lock = threading.Lock()
if proxy is None:
@@ -506,6 +604,7 @@ class KeepClient(object):
self.block_cache = block_cache if block_cache else KeepBlockCache()
self.timeout = timeout
self.proxy_timeout = proxy_timeout
+ self._user_agent_pool = Queue.LifoQueue()
if local_store:
self.local_store = local_store
@@ -513,7 +612,6 @@ class KeepClient(object):
self.put = self.local_store_put
else:
self.num_retries = num_retries
- self.session = session if session is not None else requests.Session()
if proxy:
if not proxy.endswith('/'):
proxy += '/'
@@ -568,9 +666,13 @@ class KeepClient(object):
# Precompute the base URI for each service.
for r in accessible:
- r['_service_root'] = "{}://[{}]:{:d}/".format(
+ host = r['service_host']
+ if not host.startswith('[') and host.find(':') >= 0:
+ # IPv6 URIs must be formatted like http://[::1]:80/...
+ host = '[' + host + ']'
+ r['_service_root'] = "{}://{}:{:d}/".format(
'https' if r['service_ssl_flag'] else 'http',
- r['service_host'],
+ host,
r['service_port'])
# Gateway services are only used when specified by UUID,
@@ -638,7 +740,8 @@ class KeepClient(object):
local_roots = self.weighted_service_roots(locator, force_rebuild)
for root in local_roots:
if root not in roots_map:
- roots_map[root] = self.KeepService(root, self.session, **headers)
+ roots_map[root] = self.KeepService(
+ root, self._user_agent_pool, **headers)
return local_roots
@staticmethod
@@ -709,7 +812,10 @@ class KeepClient(object):
self._gateway_services.get(hint[2:])
)])
# Map root URLs to their KeepService objects.
- roots_map = {root: self.KeepService(root, self.session) for root in hint_roots}
+ roots_map = {
+ root: self.KeepService(root, self._user_agent_pool)
+ for root in hint_roots
+ }
# See #3147 for a discussion of the loop implementation. Highlights:
# * Refresh the list of Keep services after each failure, in case
@@ -750,8 +856,8 @@ class KeepClient(object):
# Q: Including 403 is necessary for the Keep tests to continue
# passing, but maybe they should expect KeepReadError instead?
not_founds = sum(1 for key in sorted_roots
- if roots_map[key].last_status() in {403, 404, 410})
- service_errors = ((key, roots_map[key].last_result)
+ if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
+ service_errors = ((key, roots_map[key].last_result()['error'])
for key in sorted_roots)
if not roots_map:
raise arvados.errors.KeepReadError(
@@ -835,9 +941,9 @@ class KeepClient(object):
"failed to write {}: no Keep services available ({})".format(
data_hash, loop.last_result()))
else:
- service_errors = ((key, roots_map[key].last_result)
+ service_errors = ((key, roots_map[key].last_result()['error'])
for key in local_roots
- if not roots_map[key].success_flag)
+ if roots_map[key].last_result()['error'])
raise arvados.errors.KeepWriteError(
"failed to write {} (wanted {} copies but wrote {})".format(
data_hash, copies, thread_limiter.done()), service_errors, label="service")
diff --git a/sdk/python/arvados/retry.py b/sdk/python/arvados/retry.py
index 52a68fa..e4ad644 100644
--- a/sdk/python/arvados/retry.py
+++ b/sdk/python/arvados/retry.py
@@ -2,6 +2,7 @@
import functools
import inspect
+import pycurl
import time
from collections import deque
@@ -109,11 +110,11 @@ class RetryLoop(object):
"queried loop results before any were recorded")
-def check_http_response_success(result):
- """Convert a 'requests' response to a loop control flag.
+def check_http_response_success(status_code):
+ """Convert an HTTP status code to a loop control flag.
- Pass this method a requests.Response object. It returns True if
- the response indicates success, None if it indicates temporary
+ Pass this method a numeric HTTP status code. It returns True if
+ the code indicates success, None if it indicates temporary
failure, and False otherwise. You can use this as the
success_check for a RetryLoop.
@@ -128,15 +129,11 @@ def check_http_response_success(result):
3xx status codes. They don't indicate success, and you can't
retry those requests verbatim.
"""
- try:
- status = result.status_code
- except Exception:
- return None
- if status in _HTTP_SUCCESSES:
+ if status_code in _HTTP_SUCCESSES:
return True
- elif status in _HTTP_CAN_RETRY:
+ elif status_code in _HTTP_CAN_RETRY:
return None
- elif 100 <= status < 600:
+ elif 100 <= status_code < 600:
return False
else:
return None # Get well soon, server.
diff --git a/sdk/python/setup.py b/sdk/python/setup.py
index ca28025..f27c28d 100644
--- a/sdk/python/setup.py
+++ b/sdk/python/setup.py
@@ -26,23 +26,24 @@ setup(name='arvados-python-client',
license='Apache 2.0',
packages=find_packages(),
scripts=[
- 'bin/arv-copy',
- 'bin/arv-get',
- 'bin/arv-keepdocker',
- 'bin/arv-ls',
- 'bin/arv-normalize',
- 'bin/arv-put',
- 'bin/arv-run',
- 'bin/arv-ws'
- ],
+ 'bin/arv-copy',
+ 'bin/arv-get',
+ 'bin/arv-keepdocker',
+ 'bin/arv-ls',
+ 'bin/arv-normalize',
+ 'bin/arv-put',
+ 'bin/arv-run',
+ 'bin/arv-ws'
+ ],
install_requires=[
- 'python-gflags',
- 'google-api-python-client',
- 'httplib2',
- 'requests>=2.4',
- 'urllib3',
- 'ws4py'
- ],
+ 'google-api-python-client',
+ 'httplib2',
+ 'pycurl',
+ 'python-gflags',
+ 'requests>=2.4',
+ 'urllib3',
+ 'ws4py'
+ ],
test_suite='tests',
tests_require=['mock>=1.0', 'PyYAML'],
zip_safe=False,
diff --git a/sdk/python/tests/arvados_testutil.py b/sdk/python/tests/arvados_testutil.py
index a10802a..b4e97f6 100644
--- a/sdk/python/tests/arvados_testutil.py
+++ b/sdk/python/tests/arvados_testutil.py
@@ -8,8 +8,8 @@ import httplib2
import io
import mock
import os
+import pycurl
import Queue
-import requests
import shutil
import tempfile
import unittest
@@ -43,44 +43,80 @@ def mock_responses(body, *codes, **headers):
return mock.patch('httplib2.Http.request', side_effect=queue_with((
(fake_httplib2_response(code, **headers), body) for code in codes)))
-# fake_requests_response, mock_get_responses and mock_put_responses
-# mock calls to requests.get() and requests.put()
-def fake_requests_response(code, body, **headers):
- r = requests.Response()
- r.status_code = code
- r.reason = httplib.responses.get(code, "Unknown Response")
- r.headers = headers
- r.raw = io.BytesIO(body)
- return r
-
-# The following methods patch requests.Session(), where return_value is a mock
-# Session object. The put/get attributes are set on mock Session, and the
-# desired put/get behavior is set on the put/get mocks.
-
-def mock_put_responses(body, *codes, **headers):
- m = mock.MagicMock()
+
+class FakeCurl:
+ @classmethod
+ def make(cls, code, body='', headers={}):
+ return mock.Mock(spec=cls, wraps=cls(code, body, headers))
+
+ def __init__(self, code=200, body='', headers={}):
+ self._opt = {}
+ self._got_url = None
+ self._writer = None
+ self._headerfunction = None
+ self._resp_code = code
+ self._resp_body = body
+ self._resp_headers = headers
+
+ def getopt(self, opt):
+ return self._opt.get(str(opt), None)
+
+ def setopt(self, opt, val):
+ self._opt[str(opt)] = val
+ if opt == pycurl.WRITEDATA:
+ self._writer = val
+ elif opt == pycurl.HEADERFUNCTION:
+ self._headerfunction = val
+
+ def perform(self):
+ if not isinstance(self._resp_code, int):
+ raise self._resp_code
+ if self.getopt(pycurl.URL) is None:
+ raise ValueError
+ if self._writer is None:
+ raise ValueError
+ if self._headerfunction:
+ self._headerfunction("HTTP/1.1 {} Status".format(self._resp_code))
+ for k, v in self._resp_headers.iteritems():
+ self._headerfunction(k + ': ' + str(v))
+ self._writer.write(self._resp_body)
+
+ def close(self):
+ pass
+
+ def reset(self):
+ """Prevent fake UAs from going back into the user agent pool."""
+ raise Exception
+
+ def getinfo(self, opt):
+ if opt == pycurl.RESPONSE_CODE:
+ return self._resp_code
+ raise Exception
+
+def mock_keep_responses(body, *codes, **headers):
+ """Patch pycurl to return fake responses and raise exceptions.
+
+ body can be a string to return as the response body; an exception
+ to raise when perform() is called; or an iterable that returns a
+ sequence of such values.
+ """
+ cm = mock.MagicMock()
if isinstance(body, tuple):
codes = list(codes)
codes.insert(0, body)
- m.return_value.put.side_effect = queue_with((fake_requests_response(code, b, **headers) for b, code in codes))
+ responses = [
+ FakeCurl.make(code=code, body=b, headers=headers)
+ for b, code in codes
+ ]
else:
- m.return_value.put.side_effect = queue_with((fake_requests_response(code, body, **headers) for code in codes))
- return mock.patch('requests.Session', m)
-
-def mock_get_responses(body, *codes, **headers):
- m = mock.MagicMock()
- m.return_value.get.side_effect = queue_with((fake_requests_response(code, body, **headers) for code in codes))
- return mock.patch('requests.Session', m)
-
-def mock_get(side_effect):
- m = mock.MagicMock()
- m.return_value.get.side_effect = side_effect
- return mock.patch('requests.Session', m)
-
-def mock_put(side_effect):
- m = mock.MagicMock()
- m.return_value.put.side_effect = side_effect
- return mock.patch('requests.Session', m)
+ responses = [
+ FakeCurl.make(code=code, body=body, headers=headers)
+ for code in codes
+ ]
+ cm.side_effect = queue_with(responses)
+ cm.responses = responses
+ return mock.patch('pycurl.Curl', cm)
+
class MockStreamReader(object):
def __init__(self, name='.', *data):
diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py
index d3198be..a397f44 100644
--- a/sdk/python/tests/test_collections.py
+++ b/sdk/python/tests/test_collections.py
@@ -551,7 +551,7 @@ class CollectionReaderTestCase(unittest.TestCase, CollectionTestMixin):
def test_locator_init(self):
client = self.api_client_mock(200)
# Ensure Keep will not return anything if asked.
- with tutil.mock_get_responses(None, 404):
+ with tutil.mock_keep_responses(None, 404):
reader = arvados.CollectionReader(self.DEFAULT_DATA_HASH,
api_client=client)
self.assertEqual(self.DEFAULT_MANIFEST, reader.manifest_text())
@@ -561,7 +561,7 @@ class CollectionReaderTestCase(unittest.TestCase, CollectionTestMixin):
# been written to Keep.
client = self.api_client_mock(200)
self.mock_get_collection(client, 404, None)
- with tutil.mock_get_responses(self.DEFAULT_MANIFEST, 200):
+ with tutil.mock_keep_responses(self.DEFAULT_MANIFEST, 200):
reader = arvados.CollectionReader(self.DEFAULT_DATA_HASH,
api_client=client)
self.assertEqual(self.DEFAULT_MANIFEST, reader.manifest_text())
@@ -569,7 +569,7 @@ class CollectionReaderTestCase(unittest.TestCase, CollectionTestMixin):
def test_uuid_init_no_fallback_to_keep(self):
# Do not look up a collection UUID in Keep.
client = self.api_client_mock(404)
- with tutil.mock_get_responses(self.DEFAULT_MANIFEST, 200):
+ with tutil.mock_keep_responses(self.DEFAULT_MANIFEST, 200):
with self.assertRaises(arvados.errors.ApiError):
reader = arvados.CollectionReader(self.DEFAULT_UUID,
api_client=client)
@@ -578,7 +578,7 @@ class CollectionReaderTestCase(unittest.TestCase, CollectionTestMixin):
# To verify that CollectionReader tries Keep first here, we
# mock API server to return the wrong data.
client = self.api_client_mock(200)
- with tutil.mock_get_responses(self.ALT_MANIFEST, 200):
+ with tutil.mock_keep_responses(self.ALT_MANIFEST, 200):
self.assertEqual(
self.ALT_MANIFEST,
arvados.CollectionReader(
@@ -590,7 +590,7 @@ class CollectionReaderTestCase(unittest.TestCase, CollectionTestMixin):
client = self.api_client_mock(200)
reader = arvados.CollectionReader(self.DEFAULT_UUID, api_client=client,
num_retries=3)
- with tutil.mock_get_responses('foo', 500, 500, 200):
+ with tutil.mock_keep_responses('foo', 500, 500, 200):
self.assertEqual('foo',
''.join(f.read(9) for f in reader.all_files()))
@@ -630,7 +630,7 @@ class CollectionReaderTestCase(unittest.TestCase, CollectionTestMixin):
def test_api_response_with_collection_from_keep(self):
client = self.api_client_mock()
self.mock_get_collection(client, 404, 'foo')
- with tutil.mock_get_responses(self.DEFAULT_MANIFEST, 200):
+ with tutil.mock_keep_responses(self.DEFAULT_MANIFEST, 200):
reader = arvados.CollectionReader(self.DEFAULT_DATA_HASH,
api_client=client)
api_response = reader.api_response()
@@ -673,7 +673,7 @@ class CollectionReaderTestCase(unittest.TestCase, CollectionTestMixin):
class CollectionWriterTestCase(unittest.TestCase, CollectionTestMixin):
def mock_keep(self, body, *codes, **headers):
headers.setdefault('x-keep-replicas-stored', 2)
- return tutil.mock_put_responses(body, *codes, **headers)
+ return tutil.mock_keep_responses(body, *codes, **headers)
def foo_writer(self, **kwargs):
kwargs.setdefault('api_client', self.api_client_mock())
@@ -695,7 +695,7 @@ class CollectionWriterTestCase(unittest.TestCase, CollectionTestMixin):
def test_write_insufficient_replicas_via_proxy(self):
writer = self.foo_writer(replication=3)
- with self.mock_keep(None, 200, headers={'x-keep-replicas-stored': 2}):
+ with self.mock_keep(None, 200, **{'x-keep-replicas-stored': 2}):
with self.assertRaises(arvados.errors.KeepWriteError):
writer.manifest_text()
@@ -717,10 +717,7 @@ class CollectionWriterTestCase(unittest.TestCase, CollectionTestMixin):
self.mock_keep_services(client, status=200, service_type='disk', count=6)
writer = self.foo_writer(api_client=client, replication=3)
writer.manifest_text()
- # keepmock is the mock session constructor; keepmock.return_value
- # is the mock session object, and keepmock.return_value.put is the
- # actual mock method of interest.
- self.assertEqual(6, keepmock.return_value.put.call_count)
+ self.assertEqual(6, keepmock.call_count)
def test_write_whole_collection_through_retries(self):
writer = self.foo_writer(num_retries=2)
diff --git a/sdk/python/tests/test_errors.py b/sdk/python/tests/test_errors.py
index 6e8df96..db875dc 100644
--- a/sdk/python/tests/test_errors.py
+++ b/sdk/python/tests/test_errors.py
@@ -10,8 +10,8 @@ class KeepRequestErrorTestCase(unittest.TestCase):
REQUEST_ERRORS = [
('http://keep1.zzzzz.example.org/', IOError("test IOError")),
('http://keep3.zzzzz.example.org/', MemoryError("test MemoryError")),
- ('http://keep5.zzzzz.example.org/', tutil.fake_requests_response(
- 500, "test 500")),
+ ('http://keep5.zzzzz.example.org/',
+ arv_error.HttpError(500, "Internal Server Error")),
('http://keep7.zzzzz.example.org/', IOError("second test IOError")),
]
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index be13c55..0c42c2f 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -1,6 +1,7 @@
import hashlib
import mock
import os
+import pycurl
import random
import re
import socket
@@ -273,57 +274,59 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
def test_get_timeout(self):
api_client = self.mock_keep_services(count=1)
- force_timeout = [socket.timeout("timed out")]
- with tutil.mock_get(force_timeout) as mock_session:
+ force_timeout = socket.timeout("timed out")
+ with tutil.mock_keep_responses(force_timeout, 0) as mock:
keep_client = arvados.KeepClient(api_client=api_client)
with self.assertRaises(arvados.errors.KeepReadError):
keep_client.get('ffffffffffffffffffffffffffffffff')
- self.assertTrue(mock_session.return_value.get.called)
self.assertEqual(
- arvados.KeepClient.DEFAULT_TIMEOUT,
- mock_session.return_value.get.call_args[1]['timeout'])
+ mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
+ int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
+ self.assertEqual(
+ mock.responses[0].getopt(pycurl.TIMEOUT_MS),
+ int(arvados.KeepClient.DEFAULT_TIMEOUT[1]*1000))
def test_put_timeout(self):
api_client = self.mock_keep_services(count=1)
- force_timeout = [socket.timeout("timed out")]
- with tutil.mock_put(force_timeout) as mock_session:
+ force_timeout = socket.timeout("timed out")
+ with tutil.mock_keep_responses(force_timeout, 0) as mock:
keep_client = arvados.KeepClient(api_client=api_client)
with self.assertRaises(arvados.errors.KeepWriteError):
keep_client.put('foo')
- self.assertTrue(mock_session.return_value.put.called)
self.assertEqual(
- arvados.KeepClient.DEFAULT_TIMEOUT,
- mock_session.return_value.put.call_args[1]['timeout'])
+ mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
+ int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
+ self.assertEqual(
+ mock.responses[0].getopt(pycurl.TIMEOUT_MS),
+ int(arvados.KeepClient.DEFAULT_TIMEOUT[1]*1000))
def test_proxy_get_timeout(self):
- # Force a timeout, verifying that the requests.get or
- # requests.put method was called with the proxy_timeout
- # setting rather than the default timeout.
api_client = self.mock_keep_services(service_type='proxy', count=1)
- force_timeout = [socket.timeout("timed out")]
- with tutil.mock_get(force_timeout) as mock_session:
+ force_timeout = socket.timeout("timed out")
+ with tutil.mock_keep_responses(force_timeout, 0) as mock:
keep_client = arvados.KeepClient(api_client=api_client)
with self.assertRaises(arvados.errors.KeepReadError):
keep_client.get('ffffffffffffffffffffffffffffffff')
- self.assertTrue(mock_session.return_value.get.called)
self.assertEqual(
- arvados.KeepClient.DEFAULT_PROXY_TIMEOUT,
- mock_session.return_value.get.call_args[1]['timeout'])
+ mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
+ int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
+ self.assertEqual(
+ mock.responses[0].getopt(pycurl.TIMEOUT_MS),
+ int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]*1000))
def test_proxy_put_timeout(self):
- # Force a timeout, verifying that the requests.get or
- # requests.put method was called with the proxy_timeout
- # setting rather than the default timeout.
api_client = self.mock_keep_services(service_type='proxy', count=1)
- force_timeout = [socket.timeout("timed out")]
- with tutil.mock_put(force_timeout) as mock_session:
+ force_timeout = socket.timeout("timed out")
+ with tutil.mock_keep_responses(force_timeout, 0) as mock:
keep_client = arvados.KeepClient(api_client=api_client)
with self.assertRaises(arvados.errors.KeepWriteError):
keep_client.put('foo')
- self.assertTrue(mock_session.return_value.put.called)
self.assertEqual(
- arvados.KeepClient.DEFAULT_PROXY_TIMEOUT,
- mock_session.return_value.put.call_args[1]['timeout'])
+ mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
+ int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
+ self.assertEqual(
+ mock.responses[0].getopt(pycurl.TIMEOUT_MS),
+ int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]*1000))
def test_probe_order_reference_set(self):
# expected_order[i] is the probe order for
@@ -397,9 +400,9 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
aport = random.randint(1024,65535)
api_client = self.mock_keep_services(service_port=aport, count=16)
keep_client = arvados.KeepClient(api_client=api_client)
- with mock.patch('requests.' + verb,
- side_effect=socket.timeout) as req_mock, \
- self.assertRaises(exc_class) as err_check:
+ with mock.patch('pycurl.Curl') as curl_mock, \
+ self.assertRaises(exc_class) as err_check:
+ curl_mock.return_value.side_effect = socket.timeout
getattr(keep_client, verb)(data)
urls = [urlparse.urlparse(url)
for url in err_check.exception.request_errors()]
@@ -429,7 +432,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
def check_errors_from_last_retry(self, verb, exc_class):
api_client = self.mock_keep_services(count=2)
- req_mock = getattr(tutil, 'mock_{}_responses'.format(verb))(
+ req_mock = tutil.mock_keep_responses(
"retry error reporting test", 500, 500, 403, 403)
with req_mock, tutil.skip_sleep, \
self.assertRaises(exc_class) as err_check:
@@ -450,7 +453,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
data = 'partial failure test'
data_loc = '{}+{}'.format(hashlib.md5(data).hexdigest(), len(data))
api_client = self.mock_keep_services(count=3)
- with tutil.mock_put_responses(data_loc, 200, 500, 500) as req_mock, \
+ with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
keep_client = arvados.KeepClient(api_client=api_client)
keep_client.put(data)
@@ -468,28 +471,31 @@ class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
'service_type': 'gateway:test',
} for i in range(gateways)]
self.gateway_roots = [
- "https://[{service_host}]:{service_port}/".format(**gw)
+ "https://{service_host}:{service_port}/".format(**gw)
for gw in self.gateways]
self.api_client = self.mock_keep_services(
count=disks, additional_services=self.gateways)
self.keepClient = arvados.KeepClient(api_client=self.api_client)
- @mock.patch('requests.Session')
- def test_get_with_gateway_hint_first(self, MockSession):
- MockSession.return_value.get.return_value = tutil.fake_requests_response(
+ @mock.patch('pycurl.Curl')
+ def test_get_with_gateway_hint_first(self, MockCurl):
+ MockCurl.return_value = tutil.FakeCurl.make(
code=200, body='foo', headers={'Content-Length': 3})
self.mock_disks_and_gateways()
locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@' + self.gateways[0]['uuid']
self.assertEqual('foo', self.keepClient.get(locator))
- self.assertEqual((self.gateway_roots[0]+locator,),
- MockSession.return_value.get.call_args_list[0][0])
+ self.assertEqual(self.gateway_roots[0]+locator,
+ MockCurl.return_value.getopt(pycurl.URL))
- @mock.patch('requests.Session')
- def test_get_with_gateway_hints_in_order(self, MockSession):
+ @mock.patch('pycurl.Curl')
+ def test_get_with_gateway_hints_in_order(self, MockCurl):
gateways = 4
disks = 3
- MockSession.return_value.get.return_value = tutil.fake_requests_response(
- code=404, body='')
+ mocks = [
+ tutil.FakeCurl.make(code=404, body='')
+ for _ in range(gateways+disks)
+ ]
+ MockCurl.side_effect = tutil.queue_with(mocks)
self.mock_disks_and_gateways(gateways=gateways, disks=disks)
locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
['K@'+gw['uuid'] for gw in self.gateways])
@@ -497,23 +503,23 @@ class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
self.keepClient.get(locator)
# Gateways are tried first, in the order given.
for i, root in enumerate(self.gateway_roots):
- self.assertEqual((root+locator,),
- MockSession.return_value.get.call_args_list[i][0])
+ self.assertEqual(root+locator,
+ mocks[i].getopt(pycurl.URL))
# Disk services are tried next.
for i in range(gateways, gateways+disks):
self.assertRegexpMatches(
- MockSession.return_value.get.call_args_list[i][0][0],
+ mocks[i].getopt(pycurl.URL),
r'keep0x')
- @mock.patch('requests.Session')
- def test_get_with_remote_proxy_hint(self, MockSession):
- MockSession.return_value.get.return_value = tutil.fake_requests_response(
+ @mock.patch('pycurl.Curl')
+ def test_get_with_remote_proxy_hint(self, MockCurl):
+ MockCurl.return_value = tutil.FakeCurl.make(
code=200, body='foo', headers={'Content-Length': 3})
self.mock_disks_and_gateways()
locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K at xyzzy'
self.assertEqual('foo', self.keepClient.get(locator))
- self.assertEqual(('https://keep.xyzzy.arvadosapi.com/'+locator,),
- MockSession.return_value.get.call_args_list[0][0])
+ self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
+ MockCurl.return_value.getopt(pycurl.URL))
class KeepClientRetryTestMixin(object):
@@ -587,14 +593,14 @@ class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase):
DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
DEFAULT_EXCEPTION = arvados.errors.KeepReadError
HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K at xyzzy'
- TEST_PATCHER = staticmethod(tutil.mock_get_responses)
+ TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
*args, **kwargs):
return self.new_client().get(locator, *args, **kwargs)
def test_specific_exception_when_not_found(self):
- with tutil.mock_get_responses(self.DEFAULT_EXPECT, 404, 200):
+ with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
self.check_exception(arvados.errors.NotFoundError, num_retries=3)
def test_general_exception_with_mixed_errors(self):
@@ -603,7 +609,7 @@ class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase):
# This test rigs up 50/50 disagreement between two servers, and
# checks that it does not become a NotFoundError.
client = self.new_client()
- with tutil.mock_get_responses(self.DEFAULT_EXPECT, 404, 500):
+ with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
client.get(self.HINTED_LOCATOR)
self.assertNotIsInstance(
@@ -611,17 +617,19 @@ class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase):
"mixed errors raised NotFoundError")
def test_hint_server_can_succeed_without_retries(self):
- with tutil.mock_get_responses(self.DEFAULT_EXPECT, 404, 200, 500):
+ with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
self.check_success(locator=self.HINTED_LOCATOR)
def test_try_next_server_after_timeout(self):
- with tutil.mock_get([
- socket.timeout("timed out"),
- tutil.fake_requests_response(200, self.DEFAULT_EXPECT)]):
+ with tutil.mock_keep_responses(
+ (socket.timeout("timed out"), 200),
+ (self.DEFAULT_EXPECT, 200)):
self.check_success(locator=self.HINTED_LOCATOR)
def test_retry_data_with_wrong_checksum(self):
- with tutil.mock_get((tutil.fake_requests_response(200, s) for s in ['baddata', self.TEST_DATA])):
+ with tutil.mock_keep_responses(
+ ('baddata', 200),
+ (self.DEFAULT_EXPECT, 200)):
self.check_success(locator=self.HINTED_LOCATOR)
@@ -629,12 +637,12 @@ class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase):
class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase):
DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
- TEST_PATCHER = staticmethod(tutil.mock_put_responses)
+ TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
def run_method(self, data=KeepClientRetryTestMixin.TEST_DATA,
copies=1, *args, **kwargs):
return self.new_client().put(data, copies, *args, **kwargs)
def test_do_not_send_multiple_copies_to_same_server(self):
- with tutil.mock_put_responses(self.DEFAULT_EXPECT, 200):
+ with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 200):
self.check_exception(copies=2, num_retries=3)
diff --git a/sdk/python/tests/test_retry.py b/sdk/python/tests/test_retry.py
index 0c1110c..4f147ba 100644
--- a/sdk/python/tests/test_retry.py
+++ b/sdk/python/tests/test_retry.py
@@ -7,8 +7,6 @@ import arvados.errors as arv_error
import arvados.retry as arv_retry
import mock
-from arvados_testutil import fake_requests_response
-
class RetryLoopTestMixin(object):
@staticmethod
def loop_success(result):
@@ -150,8 +148,7 @@ class RetryLoopBackoffTestCase(unittest.TestCase, RetryLoopTestMixin):
class CheckHTTPResponseSuccessTestCase(unittest.TestCase):
def results_map(self, *codes):
for code in codes:
- response = fake_requests_response(code, None)
- yield code, arv_retry.check_http_response_success(response)
+ yield code, arv_retry.check_http_response_success(code)
def check(assert_name):
def check_method(self, expected, *codes):
diff --git a/sdk/python/tests/test_stream.py b/sdk/python/tests/test_stream.py
index acb9929..e90f602 100644
--- a/sdk/python/tests/test_stream.py
+++ b/sdk/python/tests/test_stream.py
@@ -199,47 +199,47 @@ class StreamRetryTestMixin(object):
@tutil.skip_sleep
def test_success_without_retries(self):
- with tutil.mock_get_responses('bar', 200):
+ with tutil.mock_keep_responses('bar', 200):
reader = self.reader_for('bar_file')
self.assertEqual('bar', self.read_for_test(reader, 3))
@tutil.skip_sleep
def test_read_no_default_retry(self):
- with tutil.mock_get_responses('', 500):
+ with tutil.mock_keep_responses('', 500):
reader = self.reader_for('user_agreement')
with self.assertRaises(arvados.errors.KeepReadError):
self.read_for_test(reader, 10)
@tutil.skip_sleep
def test_read_with_instance_retries(self):
- with tutil.mock_get_responses('foo', 500, 200):
+ with tutil.mock_keep_responses('foo', 500, 200):
reader = self.reader_for('foo_file', num_retries=3)
self.assertEqual('foo', self.read_for_test(reader, 3))
@tutil.skip_sleep
def test_read_with_method_retries(self):
- with tutil.mock_get_responses('foo', 500, 200):
+ with tutil.mock_keep_responses('foo', 500, 200):
reader = self.reader_for('foo_file')
self.assertEqual('foo',
self.read_for_test(reader, 3, num_retries=3))
@tutil.skip_sleep
def test_read_instance_retries_exhausted(self):
- with tutil.mock_get_responses('bar', 500, 500, 500, 500, 200):
+ with tutil.mock_keep_responses('bar', 500, 500, 500, 500, 200):
reader = self.reader_for('bar_file', num_retries=3)
with self.assertRaises(arvados.errors.KeepReadError):
self.read_for_test(reader, 3)
@tutil.skip_sleep
def test_read_method_retries_exhausted(self):
- with tutil.mock_get_responses('bar', 500, 500, 500, 500, 200):
+ with tutil.mock_keep_responses('bar', 500, 500, 500, 500, 200):
reader = self.reader_for('bar_file')
with self.assertRaises(arvados.errors.KeepReadError):
self.read_for_test(reader, 3, num_retries=3)
@tutil.skip_sleep
def test_method_retries_take_precedence(self):
- with tutil.mock_get_responses('', 500, 500, 500, 200):
+ with tutil.mock_keep_responses('', 500, 500, 500, 200):
reader = self.reader_for('user_agreement', num_retries=10)
with self.assertRaises(arvados.errors.KeepReadError):
self.read_for_test(reader, 10, num_retries=1)
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list