[ARVADOS] updated: e62a18f3786d0f3c12865f865294a0f4d39ff548

Git user git at public.curoverse.com
Mon May 2 14:37:03 EDT 2016


Summary of changes:
 sdk/python/arvados/commands/put.py   |  18 +++++
 sdk/python/arvados/keep.py           |  46 +++++++++----
 sdk/python/tests/arvados_testutil.py |   3 +-
 sdk/python/tests/keepstub.py         |  14 ++++
 sdk/python/tests/test_arv_put.py     |  37 ++++++++++
 sdk/python/tests/test_keep_client.py | 130 +++++++++++++++++++++++++++++++++++
 6 files changed, 235 insertions(+), 13 deletions(-)

       via  e62a18f3786d0f3c12865f865294a0f4d39ff548 (commit)
       via  d8c84ea91f8c4a26860e637197f234f2dd909abb (commit)
       via  5e991889ca6c18cdd901b98b5083c4cee7d260c9 (commit)
       via  e09af1d4876cb1295785db9abb5f3d2d10323097 (commit)
       via  6599824f22cdb45ab30657ac95071aa0beeee08b (commit)
       via  b6e5a10028551a1c2b1379c5c8ed039582434f25 (commit)
       via  ec9df4864de8033b4efd8b1cecfd1875fefc303e (commit)
       via  f22692a8fdf6610045db0c34c4827a0ebcb0ae0a (commit)
      from  4c360c5a3a9564f584dac973810059d2d45d08ef (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit e62a18f3786d0f3c12865f865294a0f4d39ff548
Merge: 4c360c5 d8c84ea
Author: radhika <radhika at curoverse.com>
Date:   Mon May 2 14:36:02 2016 -0400

    refs #8937
    Merge branch '8937-arv-put-cache-check'


commit d8c84ea91f8c4a26860e637197f234f2dd909abb
Author: radhika <radhika at curoverse.com>
Date:   Wed Apr 27 11:56:01 2016 -0400

    8937: refactor cache check logic into a check_cache method and update all references.

diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index 8fa1c8f..d3510db 100644
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -197,25 +197,10 @@ class ResumeCacheConflict(Exception):
 class ResumeCache(object):
     CACHE_DIR = '.cache/arvados/arv-put'
 
-    def __init__(self, file_spec, api_client=None, num_retries=0):
+    def __init__(self, file_spec):
         self.cache_file = open(file_spec, 'a+')
         self._lock_file(self.cache_file)
         self.filename = self.cache_file.name
-        try:
-            state = self.load()
-            locator = None
-            try:
-                if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
-                    locator = state["_finished_streams"][0][1][0]
-                elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
-                    locator = state["_current_stream_locators"][0]
-                if locator is not None:
-                    kc = arvados.keep.KeepClient(api_client=api_client)
-                    kc.head(locator, num_retries=num_retries)
-            except Exception as e:
-                raise arvados.errors.KeepRequestError("Head request error for {}: {}".format(locator, e))
-        except (ValueError):
-            pass
 
     @classmethod
     def make_path(cls, args):
@@ -241,6 +226,23 @@ class ResumeCache(object):
         self.cache_file.seek(0)
         return json.load(self.cache_file)
 
+    def check_cache(self, api_client=None, num_retries=0):
+        try:
+            state = self.load()
+            locator = None
+            try:
+                if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
+                    locator = state["_finished_streams"][0][1][0]
+                elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
+                    locator = state["_current_stream_locators"][0]
+                if locator is not None:
+                    kc = arvados.keep.KeepClient(api_client=api_client)
+                    kc.head(locator, num_retries=num_retries)
+            except Exception as e:
+                self.restart()
+        except (ValueError):
+            pass
+
     def save(self, data):
         try:
             new_cache_fd, new_cache_name = tempfile.mkstemp(
@@ -452,14 +454,10 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     resume_cache = None
     if args.resume:
         try:
-            cachepath = ResumeCache.make_path(args)
-            resume_cache = ResumeCache(cachepath, api_client=api_client, num_retries=args.retries)
+            resume_cache = ResumeCache(ResumeCache.make_path(args))
+            resume_cache.check_cache(api_client=api_client, num_retries=args.retries)
         except (IOError, OSError, ValueError):
             pass  # Couldn't open cache directory/file.  Continue without it.
-        except arvados.errors.KeepRequestError:
-            # delete the cache and create a new one
-            shutil.rmtree(cachepath)
-            resume_cache = ResumeCache(cachepath)
         except ResumeCacheConflict:
             print >>stderr, "\n".join([
                 "arv-put: Another process is already uploading this data.",
diff --git a/sdk/python/tests/test_arv_put.py b/sdk/python/tests/test_arv_put.py
index f1ed35a..a6c1233 100644
--- a/sdk/python/tests/test_arv_put.py
+++ b/sdk/python/tests/test_arv_put.py
@@ -160,8 +160,9 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
             self.last_cache = arv_put.ResumeCache(cachefile.name)
         self.last_cache.save(thing)
         self.last_cache.close()
-        with self.assertRaises(arvados.errors.KeepRequestError):
-            arv_put.ResumeCache(self.last_cache.filename)
+        resume_cache = arv_put.ResumeCache(self.last_cache.filename)
+        self.assertNotEqual(None, resume_cache)
+        self.assertRaises(None, resume_cache.check_cache())
 
     def test_basic_cache_storage(self):
         thing = ['test', 'list']

commit 5e991889ca6c18cdd901b98b5083c4cee7d260c9
Author: radhika <radhika at curoverse.com>
Date:   Mon Apr 25 09:36:19 2016 -0400

    8937: invalidate cache and create new one if there are errors on head request during ResumeCache.

diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index 6fa26c6..8fa1c8f 100644
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -197,10 +197,25 @@ class ResumeCacheConflict(Exception):
 class ResumeCache(object):
     CACHE_DIR = '.cache/arvados/arv-put'
 
-    def __init__(self, file_spec):
+    def __init__(self, file_spec, api_client=None, num_retries=0):
         self.cache_file = open(file_spec, 'a+')
         self._lock_file(self.cache_file)
         self.filename = self.cache_file.name
+        try:
+            state = self.load()
+            locator = None
+            try:
+                if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
+                    locator = state["_finished_streams"][0][1][0]
+                elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
+                    locator = state["_current_stream_locators"][0]
+                if locator is not None:
+                    kc = arvados.keep.KeepClient(api_client=api_client)
+                    kc.head(locator, num_retries=num_retries)
+            except Exception as e:
+                raise arvados.errors.KeepRequestError("Head request error for {}: {}".format(locator, e))
+        except (ValueError):
+            pass
 
     @classmethod
     def make_path(cls, args):
@@ -437,9 +452,14 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     resume_cache = None
     if args.resume:
         try:
-            resume_cache = ResumeCache(ResumeCache.make_path(args))
+            cachepath = ResumeCache.make_path(args)
+            resume_cache = ResumeCache(cachepath, api_client=api_client, num_retries=args.retries)
         except (IOError, OSError, ValueError):
             pass  # Couldn't open cache directory/file.  Continue without it.
+        except arvados.errors.KeepRequestError:
+            # delete the cache and create a new one
+            shutil.rmtree(cachepath)
+            resume_cache = ResumeCache(cachepath)
         except ResumeCacheConflict:
             print >>stderr, "\n".join([
                 "arv-put: Another process is already uploading this data.",
diff --git a/sdk/python/tests/test_arv_put.py b/sdk/python/tests/test_arv_put.py
index 896b880..f1ed35a 100644
--- a/sdk/python/tests/test_arv_put.py
+++ b/sdk/python/tests/test_arv_put.py
@@ -127,6 +127,42 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
             else:
                 config['ARVADOS_API_HOST'] = orig_host
 
+    @mock.patch('arvados.keep.KeepClient.head')
+    def test_resume_cache_with_current_stream_locators(self, keep_client_head):
+        keep_client_head.side_effect = [True]
+        thing = {}
+        thing['_current_stream_locators'] = ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']
+        with tempfile.NamedTemporaryFile() as cachefile:
+            self.last_cache = arv_put.ResumeCache(cachefile.name)
+        self.last_cache.save(thing)
+        self.last_cache.close()
+        resume_cache = arv_put.ResumeCache(self.last_cache.filename)
+        self.assertNotEqual(None, resume_cache)
+
+    @mock.patch('arvados.keep.KeepClient.head')
+    def test_resume_cache_with_finished_streams(self, keep_client_head):
+        keep_client_head.side_effect = [True]
+        thing = {}
+        thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
+        with tempfile.NamedTemporaryFile() as cachefile:
+            self.last_cache = arv_put.ResumeCache(cachefile.name)
+        self.last_cache.save(thing)
+        self.last_cache.close()
+        resume_cache = arv_put.ResumeCache(self.last_cache.filename)
+        self.assertNotEqual(None, resume_cache)
+
+    @mock.patch('arvados.keep.KeepClient.head')
+    def test_resume_cache_with_finished_streams_error_on_head(self, keep_client_head):
+        keep_client_head.side_effect = Exception('Locator not found')
+        thing = {}
+        thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
+        with tempfile.NamedTemporaryFile() as cachefile:
+            self.last_cache = arv_put.ResumeCache(cachefile.name)
+        self.last_cache.save(thing)
+        self.last_cache.close()
+        with self.assertRaises(arvados.errors.KeepRequestError):
+            arv_put.ResumeCache(self.last_cache.filename)
+
     def test_basic_cache_storage(self):
         thing = ['test', 'list']
         with tempfile.NamedTemporaryFile() as cachefile:

commit e09af1d4876cb1295785db9abb5f3d2d10323097
Author: radhika <radhika at curoverse.com>
Date:   Thu Apr 21 14:22:46 2016 -0400

    8937: updated arvados_testutil.py to skip setting resp_body to writer when it is a boolean.

diff --git a/sdk/python/tests/arvados_testutil.py b/sdk/python/tests/arvados_testutil.py
index b2cf436..71c9b17 100644
--- a/sdk/python/tests/arvados_testutil.py
+++ b/sdk/python/tests/arvados_testutil.py
@@ -85,7 +85,8 @@ class FakeCurl:
             self._headerfunction("HTTP/1.1 {} Status".format(self._resp_code))
             for k, v in self._resp_headers.iteritems():
                 self._headerfunction(k + ': ' + str(v))
-        self._writer(self._resp_body)
+        if type(self._resp_body) is not bool:
+            self._writer(self._resp_body)
 
     def close(self):
         pass

commit 6599824f22cdb45ab30657ac95071aa0beeee08b
Author: radhika <radhika at curoverse.com>
Date:   Thu Apr 21 10:13:15 2016 -0400

    8937: Return True for Head requests in KeepClients. The tests in KeepClientRetryHeadTestCase are failing due to this and need to be worked on.

diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 8d9cc3b..3c0ad6f 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -980,7 +980,7 @@ class KeepClient(object):
             self.block_cache.cap_cache()
         if loop.success():
             if method == "HEAD":
-                return "True"
+                return True
             else:
                 return blob
 
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index 04752a6..5fbab7d 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -110,7 +110,7 @@ class KeepTestCase(run_test_server.TestCaseWithServers):
             locator,
             '^b9a772c7049325feb7130fff1f8333e9\+9',
             'wrong md5 hash from Keep.put for "test_head": ' + locator)
-        self.assertEqual("True", self.keep_client.head(locator))
+        self.assertEqual(True, self.keep_client.head(locator))
         self.assertEqual(self.keep_client.get(locator),
                          'test_head',
                          'wrong content from Keep.get for "test_head"')
@@ -827,7 +827,7 @@ class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
         self.assertEqual('foo', self.keepClient.get(locator))
         self.assertEqual(self.gateway_roots[0]+locator,
                          MockCurl.return_value.getopt(pycurl.URL))
-        self.assertEqual("True", self.keepClient.head(locator))
+        self.assertEqual(True, self.keepClient.head(locator))
 
     @mock.patch('pycurl.Curl')
     def test_get_with_gateway_hints_in_order(self, MockCurl):
@@ -893,7 +893,7 @@ class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
             code=200, body='foo', headers={'Content-Length': 3})
         self.mock_disks_and_gateways()
         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K at xyzzy'
-        self.assertEqual("True", self.keepClient.head(locator))
+        self.assertEqual(True, self.keepClient.head(locator))
         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
                          MockCurl.return_value.getopt(pycurl.URL))
 
@@ -1014,7 +1014,7 @@ class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase):
 
 @tutil.skip_sleep
 class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase):
-    DEFAULT_EXPECT = "True"
+    DEFAULT_EXPECT = True
     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K at xyzzy'
     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)

commit b6e5a10028551a1c2b1379c5c8ed039582434f25
Author: radhika <radhika at curoverse.com>
Date:   Wed Apr 20 18:28:19 2016 -0400

    8937: test updates

diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 135d985..8d9cc3b 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -432,7 +432,7 @@ class KeepClient(object):
                 _logger.info("HEAD %s: %s bytes",
                          self._result['status_code'],
                          self._result.get('content-length'))
-                return str(self._result.get('content-length'))
+                return True
 
             _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
                          self._result['status_code'],
@@ -980,7 +980,7 @@ class KeepClient(object):
             self.block_cache.cap_cache()
         if loop.success():
             if method == "HEAD":
-                return "true"
+                return "True"
             else:
                 return blob
 
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index fa82dcc..04752a6 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -110,7 +110,7 @@ class KeepTestCase(run_test_server.TestCaseWithServers):
             locator,
             '^b9a772c7049325feb7130fff1f8333e9\+9',
             'wrong md5 hash from Keep.put for "test_head": ' + locator)
-        self.assertEqual("true", self.keep_client.head(locator))
+        self.assertEqual("True", self.keep_client.head(locator))
         self.assertEqual(self.keep_client.get(locator),
                          'test_head',
                          'wrong content from Keep.get for "test_head"')
@@ -827,7 +827,7 @@ class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
         self.assertEqual('foo', self.keepClient.get(locator))
         self.assertEqual(self.gateway_roots[0]+locator,
                          MockCurl.return_value.getopt(pycurl.URL))
-        self.assertEqual("true", self.keepClient.head(locator))
+        self.assertEqual("True", self.keepClient.head(locator))
 
     @mock.patch('pycurl.Curl')
     def test_get_with_gateway_hints_in_order(self, MockCurl):
@@ -893,7 +893,7 @@ class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
             code=200, body='foo', headers={'Content-Length': 3})
         self.mock_disks_and_gateways()
         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K at xyzzy'
-        self.assertEqual("true", self.keepClient.head(locator))
+        self.assertEqual("True", self.keepClient.head(locator))
         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
                          MockCurl.return_value.getopt(pycurl.URL))
 
@@ -1014,7 +1014,7 @@ class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase):
 
 @tutil.skip_sleep
 class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase):
-    DEFAULT_EXPECT = "true"
+    DEFAULT_EXPECT = "True"
     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K at xyzzy'
     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)

commit ec9df4864de8033b4efd8b1cecfd1875fefc303e
Author: radhika <radhika at curoverse.com>
Date:   Wed Apr 20 17:28:31 2016 -0400

    8937: bypass cache for all head requests.

diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 4cd7dc3..135d985 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -199,20 +199,18 @@ class KeepBlockCache(object):
         with self._cache_lock:
             return self._get(locator)
 
-    def reserve_cache(self, locator, reserve=True):
+    def reserve_cache(self, locator):
         '''Reserve a cache slot for the specified locator,
         or return the existing slot.'''
         with self._cache_lock:
             n = self._get(locator)
             if n:
                 return n, False
-            elif reserve == True:
+            else:
                 # Add a new cache slot for the locator
                 n = KeepBlockCache.CacheSlot(locator)
                 self._cache.insert(0, n)
                 return n, True
-            else:
-                return None, False
 
 class Counter(object):
     def __init__(self, v=0):
@@ -433,11 +431,8 @@ class KeepClient(object):
             if method == "HEAD":
                 _logger.info("HEAD %s: %s bytes",
                          self._result['status_code'],
-                         self._headers.get('content-length'))
-                content_len = self._headers.get('content-length')
-                if content_len is None:
-                    content_len = self._result['body']
-                return str(content_len)
+                         self._result.get('content-length'))
+                return str(self._result.get('content-length'))
 
             _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
                          self._result['status_code'],
@@ -919,13 +914,11 @@ class KeepClient(object):
         self.get_counter.add(1)
 
         locator = KeepLocator(loc_s)
-        slot, first = self.block_cache.reserve_cache(locator.md5sum, True if method == "GET" else False)
-        if not first and slot is not None:
-            self.hits_counter.add(1)
-            v = slot.get()
-            if method == "HEAD":
-                return str(len(v))
-            else:
+        if method == "GET":
+            slot, first = self.block_cache.reserve_cache(locator.md5sum)
+            if not first:
+                self.hits_counter.add(1)
+                v = slot.get()
                 return v
 
         self.misses_counter.add(1)
@@ -986,7 +979,10 @@ class KeepClient(object):
             slot.set(blob)
             self.block_cache.cap_cache()
         if loop.success():
-            return blob
+            if method == "HEAD":
+                return "true"
+            else:
+                return blob
 
         # Q: Including 403 is necessary for the Keep tests to continue
         # passing, but maybe they should expect KeepReadError instead?
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index 9e9b76b..fa82dcc 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -110,7 +110,7 @@ class KeepTestCase(run_test_server.TestCaseWithServers):
             locator,
             '^b9a772c7049325feb7130fff1f8333e9\+9',
             'wrong md5 hash from Keep.put for "test_head": ' + locator)
-        self.assertEqual('9', self.keep_client.head(locator))
+        self.assertEqual("true", self.keep_client.head(locator))
         self.assertEqual(self.keep_client.get(locator),
                          'test_head',
                          'wrong content from Keep.get for "test_head"')
@@ -827,7 +827,7 @@ class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
         self.assertEqual('foo', self.keepClient.get(locator))
         self.assertEqual(self.gateway_roots[0]+locator,
                          MockCurl.return_value.getopt(pycurl.URL))
-        self.assertEqual('3', self.keepClient.head(locator))
+        self.assertEqual("true", self.keepClient.head(locator))
 
     @mock.patch('pycurl.Curl')
     def test_get_with_gateway_hints_in_order(self, MockCurl):
@@ -893,7 +893,7 @@ class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
             code=200, body='foo', headers={'Content-Length': 3})
         self.mock_disks_and_gateways()
         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K at xyzzy'
-        self.assertEqual(3, int(self.keepClient.head(locator)))
+        self.assertEqual("true", self.keepClient.head(locator))
         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
                          MockCurl.return_value.getopt(pycurl.URL))
 
@@ -1014,7 +1014,7 @@ class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase):
 
 @tutil.skip_sleep
 class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase):
-    DEFAULT_EXPECT = str(len(KeepClientRetryTestMixin.TEST_DATA))
+    DEFAULT_EXPECT = "true"
     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K at xyzzy'
     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)

commit f22692a8fdf6610045db0c34c4827a0ebcb0ae0a
Author: radhika <radhika at curoverse.com>
Date:   Wed Apr 20 15:54:03 2016 -0400

    8937: add head request to python keep client.

diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index cd39f83..4cd7dc3 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -199,19 +199,20 @@ class KeepBlockCache(object):
         with self._cache_lock:
             return self._get(locator)
 
-    def reserve_cache(self, locator):
+    def reserve_cache(self, locator, reserve=True):
         '''Reserve a cache slot for the specified locator,
         or return the existing slot.'''
         with self._cache_lock:
             n = self._get(locator)
             if n:
                 return n, False
-            else:
+            elif reserve == True:
                 # Add a new cache slot for the locator
                 n = KeepBlockCache.CacheSlot(locator)
                 self._cache.insert(0, n)
                 return n, True
-
+            else:
+                return None, False
 
 class Counter(object):
     def __init__(self, v=0):
@@ -374,10 +375,10 @@ class KeepClient(object):
             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
             return s
 
-        def get(self, locator, timeout=None):
+        def get(self, locator, method="GET", timeout=None):
             # locator is a KeepLocator object.
             url = self.root + str(locator)
-            _logger.debug("Request: GET %s", url)
+            _logger.debug("Request: %s %s", method, url)
             curl = self._get_user_agent()
             ok = None
             try:
@@ -391,7 +392,10 @@ class KeepClient(object):
                         '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
+                    if method == "HEAD":
+                        curl.setopt(pycurl.NOBODY, True)
                     self._setcurltimeouts(curl, timeout)
+
                     try:
                         curl.perform()
                     except Exception as e:
@@ -402,6 +406,7 @@ class KeepClient(object):
                         'headers': self._headers,
                         'error': False,
                     }
+
                 ok = retry.check_http_response_success(self._result['status_code'])
                 if not ok:
                     self._result['error'] = arvados.errors.HttpError(
@@ -425,11 +430,21 @@ class KeepClient(object):
                 _logger.debug("Request fail: GET %s => %s: %s",
                               url, type(self._result['error']), str(self._result['error']))
                 return None
+            if method == "HEAD":
+                _logger.info("HEAD %s: %s bytes",
+                         self._result['status_code'],
+                         self._headers.get('content-length'))
+                content_len = self._headers.get('content-length')
+                if content_len is None:
+                    content_len = self._result['body']
+                return str(content_len)
+
             _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
                          self._result['status_code'],
                          len(self._result['body']),
                          t.msecs,
                          (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
+
             if self.download_counter:
                 self.download_counter.add(len(self._result['body']))
             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
@@ -871,7 +886,14 @@ class KeepClient(object):
             return None
 
     @retry.retry_method
+    def head(self, loc_s, num_retries=None):
+        return self._get_or_head(loc_s, method="HEAD", num_retries=num_retries)
+
+    @retry.retry_method
     def get(self, loc_s, num_retries=None):
+        return self._get_or_head(loc_s, method="GET", num_retries=num_retries)
+
+    def _get_or_head(self, loc_s, method="GET", num_retries=None):
         """Get data from Keep.
 
         This method fetches one or more blocks of data from Keep.  It
@@ -897,11 +919,14 @@ class KeepClient(object):
         self.get_counter.add(1)
 
         locator = KeepLocator(loc_s)
-        slot, first = self.block_cache.reserve_cache(locator.md5sum)
-        if not first:
+        slot, first = self.block_cache.reserve_cache(locator.md5sum, True if method == "GET" else False)
+        if not first and slot is not None:
             self.hits_counter.add(1)
             v = slot.get()
-            return v
+            if method == "HEAD":
+                return str(len(v))
+            else:
+                return v
 
         self.misses_counter.add(1)
 
@@ -951,14 +976,15 @@ class KeepClient(object):
                                for root in sorted_roots
                                if roots_map[root].usable()]
             for keep_service in services_to_try:
-                blob = keep_service.get(locator, timeout=self.current_timeout(num_retries-tries_left))
+                blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
                 if blob is not None:
                     break
             loop.save_result((blob, len(services_to_try)))
 
         # Always cache the result, then return it if we succeeded.
-        slot.set(blob)
-        self.block_cache.cap_cache()
+        if method == "GET":
+            slot.set(blob)
+            self.block_cache.cap_cache()
         if loop.success():
             return blob
 
diff --git a/sdk/python/tests/keepstub.py b/sdk/python/tests/keepstub.py
index f074f8d..d79788c 100644
--- a/sdk/python/tests/keepstub.py
+++ b/sdk/python/tests/keepstub.py
@@ -121,6 +121,20 @@ class Handler(BaseHTTPServer.BaseHTTPRequestHandler, object):
         self.wfile_bandwidth_write(self.server.store[datahash])
         self.server._do_delay('response_close')
 
+    def do_HEAD(self):
+        self.server._do_delay('response')
+        r = re.search(r'[0-9a-f]{32}', self.path)
+        if not r:
+            return self.send_response(422)
+        datahash = r.group(0)
+        if datahash not in self.server.store:
+            return self.send_response(404)
+        self.send_response(200)
+        self.send_header('Content-type', 'application/octet-stream')
+        self.send_header('Content-length', str(len(self.server.store[datahash])))
+        self.end_headers()
+        self.server._do_delay('response_close')
+
     def do_PUT(self):
         self.server._do_delay('request_body')
         # The comments at https://bugs.python.org/issue1491 implies that Python
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index 5cba8cc..9e9b76b 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -104,6 +104,17 @@ class KeepTestCase(run_test_server.TestCaseWithServers):
             # Must be a string type
             self.keep_client.put({})
 
+    def test_KeepHeadTest(self):
+        locator = self.keep_client.put('test_head')
+        self.assertRegexpMatches(
+            locator,
+            '^b9a772c7049325feb7130fff1f8333e9\+9',
+            'wrong md5 hash from Keep.put for "test_head": ' + locator)
+        self.assertEqual('9', self.keep_client.head(locator))
+        self.assertEqual(self.keep_client.get(locator),
+                         'test_head',
+                         'wrong content from Keep.get for "test_head"')
+
 class KeepPermissionTestCase(run_test_server.TestCaseWithServers):
     MAIN_SERVER = {}
     KEEP_SERVER = {'blob_signing_key': 'abcdefghijk0123456789',
@@ -318,6 +329,23 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
 
+    def test_head_timeout(self):
+        api_client = self.mock_keep_services(count=1)
+        force_timeout = socket.timeout("timed out")
+        with tutil.mock_keep_responses(force_timeout, 0) as mock:
+            keep_client = arvados.KeepClient(api_client=api_client)
+            with self.assertRaises(arvados.errors.KeepReadError):
+                keep_client.head('ffffffffffffffffffffffffffffffff')
+            self.assertEqual(
+                mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
+                int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
+            self.assertEqual(
+                mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
+                int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
+            self.assertEqual(
+                mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
+                int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
+
     def test_proxy_get_timeout(self):
         api_client = self.mock_keep_services(service_type='proxy', count=1)
         force_timeout = socket.timeout("timed out")
@@ -335,6 +363,23 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
 
+    def test_proxy_head_timeout(self):
+        api_client = self.mock_keep_services(service_type='proxy', count=1)
+        force_timeout = socket.timeout("timed out")
+        with tutil.mock_keep_responses(force_timeout, 0) as mock:
+            keep_client = arvados.KeepClient(api_client=api_client)
+            with self.assertRaises(arvados.errors.KeepReadError):
+                keep_client.head('ffffffffffffffffffffffffffffffff')
+            self.assertEqual(
+                mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
+                int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
+            self.assertEqual(
+                mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
+                int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
+            self.assertEqual(
+                mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
+                int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
+
     def test_proxy_put_timeout(self):
         api_client = self.mock_keep_services(service_type='proxy', count=1)
         force_timeout = socket.timeout("timed out")
@@ -364,6 +409,9 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
     def test_get_error_with_no_services(self):
         self.check_no_services_error('get', arvados.errors.KeepReadError)
 
+    def test_head_error_with_no_services(self):
+        self.check_no_services_error('head', arvados.errors.KeepReadError)
+
     def test_put_error_with_no_services(self):
         self.check_no_services_error('put', arvados.errors.KeepWriteError)
 
@@ -383,6 +431,9 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
     def test_get_error_reflects_last_retry(self):
         self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
 
+    def test_head_error_reflects_last_retry(self):
+        self.check_errors_from_last_retry('head', arvados.errors.KeepReadError)
+
     def test_put_error_reflects_last_retry(self):
         self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError)
 
@@ -477,6 +528,10 @@ class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
         self._test_probe_order_against_reference_set(
             lambda i: self.keep_client.get(self.hashes[i], num_retries=1))
 
+    def test_head_probe_order_against_reference_set(self):
+        self._test_probe_order_against_reference_set(
+            lambda i: self.keep_client.head(self.hashes[i], num_retries=1))
+
     def test_put_probe_order_against_reference_set(self):
         # copies=1 prevents the test from being sensitive to races
         # between writer threads.
@@ -687,6 +742,9 @@ class KeepClientTimeout(unittest.TestCase, tutil.ApiClientMock):
         with self.assertTakesGreater(self.TIMEOUT_TIME):
             with self.assertRaises(arvados.errors.KeepWriteError):
                 kc.put(self.DATA, copies=1, num_retries=0)
+        with self.assertTakesGreater(self.TIMEOUT_TIME):
+            with self.assertRaises(arvados.errors.KeepReadError) as e:
+                kc.head(loc, num_retries=0)
 
     def test_low_bandwidth_with_server_mid_delay_failure(self):
         kc = self.keepClient()
@@ -769,6 +827,7 @@ class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
         self.assertEqual('foo', self.keepClient.get(locator))
         self.assertEqual(self.gateway_roots[0]+locator,
                          MockCurl.return_value.getopt(pycurl.URL))
+        self.assertEqual('3', self.keepClient.head(locator))
 
     @mock.patch('pycurl.Curl')
     def test_get_with_gateway_hints_in_order(self, MockCurl):
@@ -795,6 +854,30 @@ class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
                 r'keep0x')
 
     @mock.patch('pycurl.Curl')
+    def test_head_with_gateway_hints_in_order(self, MockCurl):
+        gateways = 4
+        disks = 3
+        mocks = [
+            tutil.FakeCurl.make(code=404, body='')
+            for _ in range(gateways+disks)
+        ]
+        MockCurl.side_effect = tutil.queue_with(mocks)
+        self.mock_disks_and_gateways(gateways=gateways, disks=disks)
+        locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
+                           ['K@'+gw['uuid'] for gw in self.gateways])
+        with self.assertRaises(arvados.errors.NotFoundError):
+            self.keepClient.head(locator)
+        # Gateways are tried first, in the order given.
+        for i, root in enumerate(self.gateway_roots):
+            self.assertEqual(root+locator,
+                             mocks[i].getopt(pycurl.URL))
+        # Disk services are tried next.
+        for i in range(gateways, gateways+disks):
+            self.assertRegexpMatches(
+                mocks[i].getopt(pycurl.URL),
+                r'keep0x')
+
+    @mock.patch('pycurl.Curl')
     def test_get_with_remote_proxy_hint(self, MockCurl):
         MockCurl.return_value = tutil.FakeCurl.make(
             code=200, body='foo', headers={'Content-Length': 3})
@@ -804,6 +887,16 @@ class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
                          MockCurl.return_value.getopt(pycurl.URL))
 
+    @mock.patch('pycurl.Curl')
+    def test_head_with_remote_proxy_hint(self, MockCurl):
+        MockCurl.return_value = tutil.FakeCurl.make(
+            code=200, body='foo', headers={'Content-Length': 3})
+        self.mock_disks_and_gateways()
+        locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K at xyzzy'
+        self.assertEqual(3, int(self.keepClient.head(locator)))
+        self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
+                         MockCurl.return_value.getopt(pycurl.URL))
+
 
 class KeepClientRetryTestMixin(object):
     # Testing with a local Keep store won't exercise the retry behavior.
@@ -919,6 +1012,43 @@ class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase):
                 (self.DEFAULT_EXPECT, 200)):
             self.check_success(locator=self.HINTED_LOCATOR)
 
+ at tutil.skip_sleep
+class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase):
+    DEFAULT_EXPECT = str(len(KeepClientRetryTestMixin.TEST_DATA))
+    DEFAULT_EXCEPTION = arvados.errors.KeepReadError
+    HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K at xyzzy'
+    TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
+
+    def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
+                   *args, **kwargs):
+        return self.new_client().head(locator, *args, **kwargs)
+
+    def test_specific_exception_when_not_found(self):
+        with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
+            self.check_exception(arvados.errors.NotFoundError, num_retries=3)
+
+    def test_general_exception_with_mixed_errors(self):
+        # head should raise a NotFoundError if no server returns the block,
+        # and a high threshold of servers report that it's not found.
+        # This test rigs up 50/50 disagreement between two servers, and
+        # checks that it does not become a NotFoundError.
+        client = self.new_client()
+        with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
+            with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
+                client.head(self.HINTED_LOCATOR)
+            self.assertNotIsInstance(
+                exc_check.exception, arvados.errors.NotFoundError,
+                "mixed errors raised NotFoundError")
+
+    def test_hint_server_can_succeed_without_retries(self):
+        with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
+            self.check_success(locator=self.HINTED_LOCATOR)
+
+    def test_try_next_server_after_timeout(self):
+        with tutil.mock_keep_responses(
+                (socket.timeout("timed out"), 200),
+                (self.DEFAULT_EXPECT, 200)):
+            self.check_success(locator=self.HINTED_LOCATOR)
 
 @tutil.skip_sleep
 class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase):

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list