[ARVADOS] created: 20fc2f783478fb438fb0a6eee193909f899f139b

Git user git at public.curoverse.com
Wed Apr 20 15:54:25 EDT 2016


        at  20fc2f783478fb438fb0a6eee193909f899f139b (commit)


commit 20fc2f783478fb438fb0a6eee193909f899f139b
Author: radhika <radhika at curoverse.com>
Date:   Wed Apr 20 15:54:03 2016 -0400

    8937: add head request to python keep client.

diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index cd39f83..4cd7dc3 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -199,19 +199,20 @@ class KeepBlockCache(object):
         with self._cache_lock:
             return self._get(locator)
 
-    def reserve_cache(self, locator):
+    def reserve_cache(self, locator, reserve=True):
         '''Reserve a cache slot for the specified locator,
         or return the existing slot.'''
         with self._cache_lock:
             n = self._get(locator)
             if n:
                 return n, False
-            else:
+            elif reserve == True:
                 # Add a new cache slot for the locator
                 n = KeepBlockCache.CacheSlot(locator)
                 self._cache.insert(0, n)
                 return n, True
-
+            else:
+                return None, False
 
 class Counter(object):
     def __init__(self, v=0):
@@ -374,10 +375,10 @@ class KeepClient(object):
             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
             return s
 
-        def get(self, locator, timeout=None):
+        def get(self, locator, method="GET", timeout=None):
             # locator is a KeepLocator object.
             url = self.root + str(locator)
-            _logger.debug("Request: GET %s", url)
+            _logger.debug("Request: %s %s", method, url)
             curl = self._get_user_agent()
             ok = None
             try:
@@ -391,7 +392,10 @@ class KeepClient(object):
                         '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
+                    if method == "HEAD":
+                        curl.setopt(pycurl.NOBODY, True)
                     self._setcurltimeouts(curl, timeout)
+
                     try:
                         curl.perform()
                     except Exception as e:
@@ -402,6 +406,7 @@ class KeepClient(object):
                         'headers': self._headers,
                         'error': False,
                     }
+
                 ok = retry.check_http_response_success(self._result['status_code'])
                 if not ok:
                     self._result['error'] = arvados.errors.HttpError(
@@ -425,11 +430,21 @@ class KeepClient(object):
                 _logger.debug("Request fail: GET %s => %s: %s",
                               url, type(self._result['error']), str(self._result['error']))
                 return None
+            if method == "HEAD":
+                _logger.info("HEAD %s: %s bytes",
+                         self._result['status_code'],
+                         self._headers.get('content-length'))
+                content_len = self._headers.get('content-length')
+                if content_len is None:
+                    content_len = self._result['body']
+                return str(content_len)
+
             _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
                          self._result['status_code'],
                          len(self._result['body']),
                          t.msecs,
                          (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
+
             if self.download_counter:
                 self.download_counter.add(len(self._result['body']))
             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
@@ -871,7 +886,14 @@ class KeepClient(object):
             return None
 
     @retry.retry_method
+    def head(self, loc_s, num_retries=None):
+        return self._get_or_head(loc_s, method="HEAD", num_retries=num_retries)
+
+    @retry.retry_method
     def get(self, loc_s, num_retries=None):
+        return self._get_or_head(loc_s, method="GET", num_retries=num_retries)
+
+    def _get_or_head(self, loc_s, method="GET", num_retries=None):
         """Get data from Keep.
 
         This method fetches one or more blocks of data from Keep.  It
@@ -897,11 +919,14 @@ class KeepClient(object):
         self.get_counter.add(1)
 
         locator = KeepLocator(loc_s)
-        slot, first = self.block_cache.reserve_cache(locator.md5sum)
-        if not first:
+        slot, first = self.block_cache.reserve_cache(locator.md5sum, True if method == "GET" else False)
+        if not first and slot is not None:
             self.hits_counter.add(1)
             v = slot.get()
-            return v
+            if method == "HEAD":
+                return str(len(v))
+            else:
+                return v
 
         self.misses_counter.add(1)
 
@@ -951,14 +976,15 @@ class KeepClient(object):
                                for root in sorted_roots
                                if roots_map[root].usable()]
             for keep_service in services_to_try:
-                blob = keep_service.get(locator, timeout=self.current_timeout(num_retries-tries_left))
+                blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
                 if blob is not None:
                     break
             loop.save_result((blob, len(services_to_try)))
 
         # Always cache the result, then return it if we succeeded.
-        slot.set(blob)
-        self.block_cache.cap_cache()
+        if method == "GET":
+            slot.set(blob)
+            self.block_cache.cap_cache()
         if loop.success():
             return blob
 
diff --git a/sdk/python/tests/keepstub.py b/sdk/python/tests/keepstub.py
index f074f8d..d79788c 100644
--- a/sdk/python/tests/keepstub.py
+++ b/sdk/python/tests/keepstub.py
@@ -121,6 +121,20 @@ class Handler(BaseHTTPServer.BaseHTTPRequestHandler, object):
         self.wfile_bandwidth_write(self.server.store[datahash])
         self.server._do_delay('response_close')
 
+    def do_HEAD(self):
+        self.server._do_delay('response')
+        r = re.search(r'[0-9a-f]{32}', self.path)
+        if not r:
+            return self.send_response(422)
+        datahash = r.group(0)
+        if datahash not in self.server.store:
+            return self.send_response(404)
+        self.send_response(200)
+        self.send_header('Content-type', 'application/octet-stream')
+        self.send_header('Content-length', str(len(self.server.store[datahash])))
+        self.end_headers()
+        self.server._do_delay('response_close')
+
     def do_PUT(self):
         self.server._do_delay('request_body')
         # The comments at https://bugs.python.org/issue1491 implies that Python
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index 5cba8cc..9e9b76b 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -104,6 +104,17 @@ class KeepTestCase(run_test_server.TestCaseWithServers):
             # Must be a string type
             self.keep_client.put({})
 
+    def test_KeepHeadTest(self):
+        locator = self.keep_client.put('test_head')
+        self.assertRegexpMatches(
+            locator,
+            '^b9a772c7049325feb7130fff1f8333e9\+9',
+            'wrong md5 hash from Keep.put for "test_head": ' + locator)
+        self.assertEqual('9', self.keep_client.head(locator))
+        self.assertEqual(self.keep_client.get(locator),
+                         'test_head',
+                         'wrong content from Keep.get for "test_head"')
+
 class KeepPermissionTestCase(run_test_server.TestCaseWithServers):
     MAIN_SERVER = {}
     KEEP_SERVER = {'blob_signing_key': 'abcdefghijk0123456789',
@@ -318,6 +329,23 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
 
+    def test_head_timeout(self):
+        api_client = self.mock_keep_services(count=1)
+        force_timeout = socket.timeout("timed out")
+        with tutil.mock_keep_responses(force_timeout, 0) as mock:
+            keep_client = arvados.KeepClient(api_client=api_client)
+            with self.assertRaises(arvados.errors.KeepReadError):
+                keep_client.head('ffffffffffffffffffffffffffffffff')
+            self.assertEqual(
+                mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
+                int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
+            self.assertEqual(
+                mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
+                int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
+            self.assertEqual(
+                mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
+                int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
+
     def test_proxy_get_timeout(self):
         api_client = self.mock_keep_services(service_type='proxy', count=1)
         force_timeout = socket.timeout("timed out")
@@ -335,6 +363,23 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
 
+    def test_proxy_head_timeout(self):
+        api_client = self.mock_keep_services(service_type='proxy', count=1)
+        force_timeout = socket.timeout("timed out")
+        with tutil.mock_keep_responses(force_timeout, 0) as mock:
+            keep_client = arvados.KeepClient(api_client=api_client)
+            with self.assertRaises(arvados.errors.KeepReadError):
+                keep_client.head('ffffffffffffffffffffffffffffffff')
+            self.assertEqual(
+                mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
+                int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
+            self.assertEqual(
+                mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
+                int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
+            self.assertEqual(
+                mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
+                int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
+
     def test_proxy_put_timeout(self):
         api_client = self.mock_keep_services(service_type='proxy', count=1)
         force_timeout = socket.timeout("timed out")
@@ -364,6 +409,9 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
     def test_get_error_with_no_services(self):
         self.check_no_services_error('get', arvados.errors.KeepReadError)
 
+    def test_head_error_with_no_services(self):
+        self.check_no_services_error('head', arvados.errors.KeepReadError)
+
     def test_put_error_with_no_services(self):
         self.check_no_services_error('put', arvados.errors.KeepWriteError)
 
@@ -383,6 +431,9 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
     def test_get_error_reflects_last_retry(self):
         self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
 
+    def test_head_error_reflects_last_retry(self):
+        self.check_errors_from_last_retry('head', arvados.errors.KeepReadError)
+
     def test_put_error_reflects_last_retry(self):
         self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError)
 
@@ -477,6 +528,10 @@ class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
         self._test_probe_order_against_reference_set(
             lambda i: self.keep_client.get(self.hashes[i], num_retries=1))
 
+    def test_head_probe_order_against_reference_set(self):
+        self._test_probe_order_against_reference_set(
+            lambda i: self.keep_client.head(self.hashes[i], num_retries=1))
+
     def test_put_probe_order_against_reference_set(self):
         # copies=1 prevents the test from being sensitive to races
         # between writer threads.
@@ -687,6 +742,9 @@ class KeepClientTimeout(unittest.TestCase, tutil.ApiClientMock):
         with self.assertTakesGreater(self.TIMEOUT_TIME):
             with self.assertRaises(arvados.errors.KeepWriteError):
                 kc.put(self.DATA, copies=1, num_retries=0)
+        with self.assertTakesGreater(self.TIMEOUT_TIME):
+            with self.assertRaises(arvados.errors.KeepReadError) as e:
+                kc.head(loc, num_retries=0)
 
     def test_low_bandwidth_with_server_mid_delay_failure(self):
         kc = self.keepClient()
@@ -769,6 +827,7 @@ class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
         self.assertEqual('foo', self.keepClient.get(locator))
         self.assertEqual(self.gateway_roots[0]+locator,
                          MockCurl.return_value.getopt(pycurl.URL))
+        self.assertEqual('3', self.keepClient.head(locator))
 
     @mock.patch('pycurl.Curl')
     def test_get_with_gateway_hints_in_order(self, MockCurl):
@@ -795,6 +854,30 @@ class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
                 r'keep0x')
 
     @mock.patch('pycurl.Curl')
+    def test_head_with_gateway_hints_in_order(self, MockCurl):
+        gateways = 4
+        disks = 3
+        mocks = [
+            tutil.FakeCurl.make(code=404, body='')
+            for _ in range(gateways+disks)
+        ]
+        MockCurl.side_effect = tutil.queue_with(mocks)
+        self.mock_disks_and_gateways(gateways=gateways, disks=disks)
+        locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
+                           ['K@'+gw['uuid'] for gw in self.gateways])
+        with self.assertRaises(arvados.errors.NotFoundError):
+            self.keepClient.head(locator)
+        # Gateways are tried first, in the order given.
+        for i, root in enumerate(self.gateway_roots):
+            self.assertEqual(root+locator,
+                             mocks[i].getopt(pycurl.URL))
+        # Disk services are tried next.
+        for i in range(gateways, gateways+disks):
+            self.assertRegexpMatches(
+                mocks[i].getopt(pycurl.URL),
+                r'keep0x')
+
+    @mock.patch('pycurl.Curl')
     def test_get_with_remote_proxy_hint(self, MockCurl):
         MockCurl.return_value = tutil.FakeCurl.make(
             code=200, body='foo', headers={'Content-Length': 3})
@@ -804,6 +887,16 @@ class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
                          MockCurl.return_value.getopt(pycurl.URL))
 
+    @mock.patch('pycurl.Curl')
+    def test_head_with_remote_proxy_hint(self, MockCurl):
+        MockCurl.return_value = tutil.FakeCurl.make(
+            code=200, body='foo', headers={'Content-Length': 3})
+        self.mock_disks_and_gateways()
+        locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K at xyzzy'
+        self.assertEqual(3, int(self.keepClient.head(locator)))
+        self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
+                         MockCurl.return_value.getopt(pycurl.URL))
+
 
 class KeepClientRetryTestMixin(object):
     # Testing with a local Keep store won't exercise the retry behavior.
@@ -919,6 +1012,43 @@ class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase):
                 (self.DEFAULT_EXPECT, 200)):
             self.check_success(locator=self.HINTED_LOCATOR)
 
+ at tutil.skip_sleep
+class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase):
+    DEFAULT_EXPECT = str(len(KeepClientRetryTestMixin.TEST_DATA))
+    DEFAULT_EXCEPTION = arvados.errors.KeepReadError
+    HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K at xyzzy'
+    TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
+
+    def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
+                   *args, **kwargs):
+        return self.new_client().head(locator, *args, **kwargs)
+
+    def test_specific_exception_when_not_found(self):
+        with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
+            self.check_exception(arvados.errors.NotFoundError, num_retries=3)
+
+    def test_general_exception_with_mixed_errors(self):
+        # head should raise a NotFoundError if no server returns the block,
+        # and a high threshold of servers report that it's not found.
+        # This test rigs up 50/50 disagreement between two servers, and
+        # checks that it does not become a NotFoundError.
+        client = self.new_client()
+        with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
+            with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
+                client.head(self.HINTED_LOCATOR)
+            self.assertNotIsInstance(
+                exc_check.exception, arvados.errors.NotFoundError,
+                "mixed errors raised NotFoundError")
+
+    def test_hint_server_can_succeed_without_retries(self):
+        with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
+            self.check_success(locator=self.HINTED_LOCATOR)
+
+    def test_try_next_server_after_timeout(self):
+        with tutil.mock_keep_responses(
+                (socket.timeout("timed out"), 200),
+                (self.DEFAULT_EXPECT, 200)):
+            self.check_success(locator=self.HINTED_LOCATOR)
 
 @tutil.skip_sleep
 class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase):

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list