[ARVADOS] created: dd5fd1e84119fe1b5bd120e17d77448917c4e277

git at public.curoverse.com git at public.curoverse.com
Wed Nov 11 17:36:59 EST 2015

        at  dd5fd1e84119fe1b5bd120e17d77448917c4e277 (commit)

commit dd5fd1e84119fe1b5bd120e17d77448917c4e277
Author: Brett Smith <brett at curoverse.com>
Date:   Wed Nov 11 17:08:39 2015 -0500

    7696: Improve PySDK KeepClient.ThreadLimiter.
    * Move the calculation of how many threads to allow into the class.
    * Teach it to handle cases where max_replicas_per_service is known and
      greater than 1.  This will never happen today, but is an anticipated
    * Update docstrings to reflect current reality.
    These are all changes I made while debugging the previous race

diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 2b718d7..c297737 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -2,6 +2,7 @@ import cStringIO
 import datetime
 import hashlib
 import logging
+import math
 import os
 import pycurl
 import Queue
@@ -219,27 +220,34 @@ class KeepClient(object):
     DEFAULT_PROXY_TIMEOUT = (20, 300)
     class ThreadLimiter(object):
-        """
-        Limit the number of threads running at a given time to
-        {desired successes} minus {successes reported}. When successes
-        reported == desired, wake up the remaining threads and tell
-        them to quit.
+        """Limit the number of threads writing to Keep at once.
+        This ensures that only a number of writer threads that could
+        potentially achieve the desired replication level run at once.
+        Once the desired replication level is achieved, queued threads
+        are instructed not to run.
         Should be used in a "with" block.
-        def __init__(self, todo):
+        def __init__(self, want_copies, max_service_replicas):
             self._started = 0
-            self._todo = todo
+            self._want_copies = want_copies
             self._done = 0
             self._response = None
             self._start_lock = threading.Condition()
-            self._todo_lock = threading.Semaphore(todo)
+            if (not max_service_replicas) or (max_service_replicas >= want_copies):
+                max_threads = 1
+            else:
+                max_threads = math.ceil(float(want_copies) / max_service_replicas)
+            _logger.debug("Limiter max threads is %d", max_threads)
+            self._todo_lock = threading.Semaphore(max_threads)
             self._done_lock = threading.Lock()
             self._local = threading.local()
+            self._local.sequence = None
         def __enter__(self):
-            if getattr(self._local, 'sequence', None) is not None:
+            if self._local.sequence is not None:
                 # If the calling thread has used set_sequence(N), then
                 # we wait here until N other threads have started.
                 while self._started < self._local.sequence:
@@ -258,34 +266,28 @@ class KeepClient(object):
         def shall_i_proceed(self):
-            Return true if the current thread should do stuff. Return
-            false if the current thread should just stop.
+            Return true if the current thread should write to Keep.
+            Return false otherwise.
             with self._done_lock:
-                return (self._done < self._todo)
+                return (self._done < self._want_copies)
         def save_response(self, response_body, replicas_stored):
             Records a response body (a locator, possibly signed) returned by
-            the Keep server.  It is not necessary to save more than
-            one response, since we presume that any locator returned
-            in response to a successful request is valid.
+            the Keep server, and the number of replicas it stored.
             with self._done_lock:
                 self._done += replicas_stored
                 self._response = response_body
         def response(self):
-            """
-            Returns the body from the response to a PUT request.
-            """
+            """Return the body from the response to a PUT request."""
             with self._done_lock:
                 return self._response
         def done(self):
-            """
-            Return how many successes were reported.
-            """
+            """Return the total number of replicas successfully stored."""
             with self._done_lock:
                 return self._done
@@ -965,7 +967,8 @@ class KeepClient(object):
-            thread_limiter = KeepClient.ThreadLimiter(1 if self.max_replicas_per_service is None else copies)
+            thread_limiter = KeepClient.ThreadLimiter(
+                copies, self.max_replicas_per_service)
             threads = []
             for service_root, ks in [(root, roots_map[root])
                                      for root in sorted_roots]:

commit 49ecbe5e9fc5b8793b8f3e7615007d82804f963c
Author: Brett Smith <brett at curoverse.com>
Date:   Wed Nov 11 16:50:18 2015 -0500

    7696: PySDK determines max_replicas_per_service after querying services.
    Because max_replicas_per_service was set to 1 in the case where
    KeepClient was instantiated with no direct information about available
    Keep services, and because ThreadLimiter was being instantiated before
    querying available Keep services (via map_new_services), the first
    Keep request to talk to non-disk services would let multiple threads
    run at once.  This fixes that race condition, and adds a test that was
    triggering it semi-reliably.

diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index b3d64a4..2b718d7 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -645,6 +645,7 @@ class KeepClient(object):
             self.put = self.local_store_put
             self.num_retries = num_retries
+            self.max_replicas_per_service = None
             if proxy:
                 if not proxy.endswith('/'):
                     proxy += '/'
@@ -658,7 +659,6 @@ class KeepClient(object):
                 self._writable_services = self._keep_services
                 self.using_proxy = True
                 self._static_services_list = True
-                self.max_replicas_per_service = None
                 # It's important to avoid instantiating an API client
                 # unless we actually need one, for testing's sake.
@@ -671,7 +671,6 @@ class KeepClient(object):
                 self._writable_services = None
                 self.using_proxy = None
                 self._static_services_list = False
-                self.max_replicas_per_service = 1
     def current_timeout(self, attempt_number):
         """Return the appropriate timeout to use for this client.
@@ -955,10 +954,8 @@ class KeepClient(object):
         # Tell the proxy how many copies we want it to store
         headers['X-Keep-Desired-Replication'] = str(copies)
         roots_map = {}
-        thread_limiter = KeepClient.ThreadLimiter(1 if self.max_replicas_per_service is None else copies)
         loop = retry.RetryLoop(num_retries, self._check_loop_result,
-        thread_sequence = 0
         for tries_left in loop:
                 sorted_roots = self.map_new_services(
@@ -968,6 +965,7 @@ class KeepClient(object):
+            thread_limiter = KeepClient.ThreadLimiter(1 if self.max_replicas_per_service is None else copies)
             threads = []
             for service_root, ks in [(root, roots_map[root])
                                      for root in sorted_roots]:
@@ -980,10 +978,9 @@ class KeepClient(object):
-                    thread_sequence=thread_sequence)
+                    thread_sequence=len(threads))
-                thread_sequence += 1
             for t in threads:
             loop.save_result((thread_limiter.done() >= copies, len(threads)))
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index a5d9925..ada0dac 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -404,6 +404,19 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
             actual = keep_client.put(body, copies=1)
         self.assertEqual(pdh, actual)
+    def test_oddball_service_writer_count(self):
+        body = 'oddball service writer count'
+        pdh = tutil.str_keep_locator(body)
+        api_client = self.mock_keep_services(service_type='fancynewblobstore',
+                                             count=4)
+        headers = {'x-keep-replicas-stored': 3}
+        with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
+                                       **headers) as req_mock:
+            keep_client = arvados.KeepClient(api_client=api_client)
+            actual = keep_client.put(body, copies=2)
+        self.assertEqual(pdh, actual)
+        self.assertEqual(1, req_mock.call_count)
 class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):

commit 4fd735476b69fd5e5d5c3162a752a8a5022d8d18
Author: Brett Smith <brett at curoverse.com>
Date:   Wed Nov 11 12:17:46 2015 -0500

    7696: PySDK KeepClient uses all service types.
    Filter out gateway services from the list of usable services, rather
    than selecting only disk and proxy types.

diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index b3ccc3e..b3d64a4 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -652,12 +652,13 @@ class KeepClient(object):
                 self._gateway_services = {}
                 self._keep_services = [{
                     'uuid': 'proxy',
+                    'service_type': 'proxy',
                     '_service_root': proxy,
                 self._writable_services = self._keep_services
                 self.using_proxy = True
                 self._static_services_list = True
-                self.max_replicas_per_service = 1
+                self.max_replicas_per_service = None
                 # It's important to avoid instantiating an API client
                 # unless we actually need one, for testing's sake.
@@ -687,6 +688,10 @@ class KeepClient(object):
         t = self.proxy_timeout if self.using_proxy else self.timeout
         return (t[0] * (1 << attempt_number), t[1])
+    def _any_nondisk_services(self, service_list):
+        return any(ks.get('service_type', 'disk') != 'disk'
+                   for ks in service_list)
     def build_services_list(self, force_rebuild=False):
         if (self._static_services_list or
               (self._keep_services and not force_rebuild)):
@@ -697,12 +702,16 @@ class KeepClient(object):
             except Exception:  # API server predates Keep services.
                 keep_services = self.api_client.keep_disks().list()
-            accessible = keep_services.execute().get('items')
-            if not accessible:
+            # Gateway services are only used when specified by UUID,
+            # so there's nothing to gain by filtering them by
+            # service_type.
+            self._gateway_services = {ks['uuid']: ks for ks in
+                                      keep_services.execute()['items']}
+            if not self._gateway_services:
                 raise arvados.errors.NoKeepServersError()
             # Precompute the base URI for each service.
-            for r in accessible:
+            for r in self._gateway_services.itervalues():
                 host = r['service_host']
                 if not host.startswith('[') and host.find(':') >= 0:
                     # IPv6 URIs must be formatted like http://[::1]:80/...
@@ -712,27 +721,19 @@ class KeepClient(object):
-            # Gateway services are only used when specified by UUID,
-            # so there's nothing to gain by filtering them by
-            # service_type.
-            self._gateway_services = {ks.get('uuid'): ks for ks in accessible}
             self._keep_services = [
-                ks for ks in accessible
-                if ks.get('service_type') in ['disk', 'proxy']]
-            self._writable_services = [
-                ks for ks in accessible
-                if (ks.get('service_type') in ['disk', 'proxy']) and (True != ks.get('read_only'))]
-            _logger.debug(str(self._keep_services))
-            self.using_proxy = any(ks.get('service_type') == 'proxy'
-                                   for ks in self._keep_services)
+                ks for ks in self._gateway_services.itervalues()
+                if not ks.get('service_type', '').startswith('gateway:')]
+            self._writable_services = [ks for ks in self._keep_services
+                                       if not ks.get('read_only')]
             # For disk type services, max_replicas_per_service is 1
-            # It is unknown or unlimited for non-disk typed services.
-            for ks in accessible:
-                if ('disk' != ks.get('service_type')) and (not ks.get('read_only')):
-                    self.max_replicas_per_service = None
+            # It is unknown (unlimited) for other service types.
+            if self._any_nondisk_services(self._writable_services):
+                self.max_replicas_per_service = None
+            else:
+                self.max_replicas_per_service = 1
     def _service_weight(self, data_hash, service_uuid):
         """Compute the weight of a Keep service endpoint for a data
@@ -769,7 +770,8 @@ class KeepClient(object):
         # in that order.
         use_services = self._keep_services
         if need_writable:
-          use_services = self._writable_services
+            use_services = self._writable_services
+        self.using_proxy = self._any_nondisk_services(use_services)
             svc['_service_root'] for svc in sorted(
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index ab57ed5..a5d9925 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -387,6 +387,23 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
         self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
         self.assertEqual(0, len(exc_check.exception.request_errors()))
+    def test_oddball_service_get(self):
+        body = 'oddball service get'
+        api_client = self.mock_keep_services(service_type='fancynewblobstore')
+        with tutil.mock_keep_responses(body, 200):
+            keep_client = arvados.KeepClient(api_client=api_client)
+            actual = keep_client.get(tutil.str_keep_locator(body))
+        self.assertEqual(body, actual)
+    def test_oddball_service_put(self):
+        body = 'oddball service put'
+        pdh = tutil.str_keep_locator(body)
+        api_client = self.mock_keep_services(service_type='fancynewblobstore')
+        with tutil.mock_keep_responses(pdh, 200):
+            keep_client = arvados.KeepClient(api_client=api_client)
+            actual = keep_client.put(body, copies=1)
+        self.assertEqual(pdh, actual)
 class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):

commit 0bf65e8de6c9b2986e2b2d968afe3aff70d390fc
Author: Brett Smith <brett at curoverse.com>
Date:   Wed Nov 11 12:18:46 2015 -0500

    7696: Clean imports in PySDK arvados.keep module.

diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 8ed86fd..b3ccc3e 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -1,28 +1,15 @@
-import bz2
+import cStringIO
 import datetime
-import fcntl
-import functools
-import gflags
 import hashlib
-import json
 import logging
 import os
-import pprint
 import pycurl
 import Queue
 import re
 import socket
 import ssl
-import string
-import cStringIO
-import subprocess
-import sys
 import threading
-import time
 import timer
-import types
-import UserDict
-import zlib
 import arvados
 import arvados.config as config

commit 113d76e501d66423aa0ce442f0331aca36fd3726
Author: Brett Smith <brett at curoverse.com>
Date:   Wed Nov 11 10:06:51 2015 -0500

    7696: Refactor locator builder method in PySDK tests.

diff --git a/sdk/python/tests/arvados_testutil.py b/sdk/python/tests/arvados_testutil.py
index ea318c6..b2cf436 100644
--- a/sdk/python/tests/arvados_testutil.py
+++ b/sdk/python/tests/arvados_testutil.py
@@ -47,6 +47,8 @@ def mock_api_responses(api_client, body, codes, headers={}):
     return mock.patch.object(api_client._http, 'request', side_effect=queue_with((
         (fake_httplib2_response(code, **headers), body) for code in codes)))
+def str_keep_locator(s):
+    return '{}+{}'.format(hashlib.md5(s).hexdigest(), len(s))
 class FakeCurl:
@@ -126,8 +128,7 @@ class MockStreamReader(object):
     def __init__(self, name='.', *data):
         self._name = name
         self._data = ''.join(data)
-        self._data_locators = ['{}+{}'.format(hashlib.md5(d).hexdigest(),
-                                              len(d)) for d in data]
+        self._data_locators = [str_keep_locator(d) for d in data]
         self.num_retries = 0
     def name(self):
diff --git a/sdk/python/tests/manifest_examples.py b/sdk/python/tests/manifest_examples.py
index 2d8e475..8f0abd2 100644
--- a/sdk/python/tests/manifest_examples.py
+++ b/sdk/python/tests/manifest_examples.py
@@ -1,6 +1,5 @@
 import arvados
 import arvados_testutil as tutil
-import hashlib
 class ManifestExamples(object):
     def make_manifest(self,
@@ -9,8 +8,7 @@ class ManifestExamples(object):
         datablip = 'x' * bytes_per_block
-        data_loc = '{}+{}'.format(hashlib.md5(datablip).hexdigest(),
-                                  bytes_per_block)
+        data_loc = tutil.str_keep_locator(datablip)
         with tutil.mock_keep_responses(data_loc, 200):
             coll = arvados.CollectionWriter()
             for si in range(0, streams):
diff --git a/sdk/python/tests/test_arv_ls.py b/sdk/python/tests/test_arv_ls.py
index 90bbacf..664b57f 100644
--- a/sdk/python/tests/test_arv_ls.py
+++ b/sdk/python/tests/test_arv_ls.py
@@ -1,7 +1,6 @@
 #!/usr/bin/env python
 # -*- coding: utf-8 -*-
-import hashlib
 import io
 import random
@@ -11,6 +10,8 @@ import arvados.errors as arv_error
 import arvados.commands.ls as arv_ls
 import run_test_server
+from arvados_testutil import str_keep_locator
 class ArvLsTestCase(run_test_server.TestCaseWithServers):
     FAKE_UUID = 'zzzzz-4zz18-12345abcde12345'
@@ -24,8 +25,7 @@ class ArvLsTestCase(run_test_server.TestCaseWithServers):
     def mock_api_for_manifest(self, manifest_lines, uuid=FAKE_UUID):
         manifest_text = self.newline_join(manifest_lines)
-        pdh = '{}+{}'.format(hashlib.md5(manifest_text).hexdigest(),
-                             len(manifest_text))
+        pdh = str_keep_locator(manifest_text)
         coll_info = {'uuid': uuid,
                      'portable_data_hash': pdh,
                      'manifest_text': manifest_text}
diff --git a/sdk/python/tests/test_arvfile.py b/sdk/python/tests/test_arvfile.py
index 330dd44..ea86614 100644
--- a/sdk/python/tests/test_arvfile.py
+++ b/sdk/python/tests/test_arvfile.py
@@ -6,7 +6,6 @@ import io
 import mock
 import os
 import unittest
-import hashlib
 import time
 import arvados
@@ -30,7 +29,7 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
             return self.blocks.get(locator)
         def put(self, data, num_retries=None):
-            pdh = "%s+%i" % (hashlib.md5(data).hexdigest(), len(data))
+            pdh = tutil.str_keep_locator(data)
             self.blocks[pdh] = str(data)
             return pdh
@@ -453,7 +452,7 @@ class ArvadosFileReaderTestCase(StreamFileReaderTestCase):
         n = 0
         blocks = {}
         for d in ['01234', '34567', '67890']:
-            loc = '{}+{}'.format(hashlib.md5(d).hexdigest(), len(d))
+            loc = tutil.str_keep_locator(d)
             blocks[loc] = d
             stream.append(Range(loc, n, len(d)))
             n += len(d)
diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py
index ac7dd1b..bc3c0bf 100644
--- a/sdk/python/tests/test_collections.py
+++ b/sdk/python/tests/test_collections.py
@@ -4,7 +4,6 @@
 import arvados
 import copy
-import hashlib
 import mock
 import os
 import pprint
@@ -395,7 +394,7 @@ class ArvadosCollectionsTest(run_test_server.TestCaseWithServers,
     def test_write_directory_tree_with_zero_recursion(self):
         cwriter = arvados.CollectionWriter(self.api_client)
         content = 'd1/d2/f3d1/f2f1'
-        blockhash = hashlib.md5(content).hexdigest() + '+' + str(len(content))
+        blockhash = tutil.str_keep_locator(content)
             self.build_directory_tree(['f1', 'd1/f2', 'd1/d2/f3']),
@@ -739,7 +738,7 @@ class CollectionWriterTestCase(unittest.TestCase, CollectionTestMixin):
             self.assertEqual('.', writer.current_stream_name())
             self.assertEqual('out', writer.current_file_name())
             out_file.write('test data')
-            data_loc = hashlib.md5('test data').hexdigest() + '+9'
+            data_loc = tutil.str_keep_locator('test data')
         self.assertTrue(out_file.closed, "writer file not closed after context")
         self.assertRaises(ValueError, out_file.write, 'extra text')
         with self.mock_keep(data_loc, 200) as keep_mock:
@@ -751,15 +750,15 @@ class CollectionWriterTestCase(unittest.TestCase, CollectionTestMixin):
         writer = arvados.CollectionWriter(client)
         with writer.open('six') as out_file:
             out_file.writelines(['12', '34', '56'])
-            data_loc = hashlib.md5('123456').hexdigest() + '+6'
+            data_loc = tutil.str_keep_locator('123456')
         with self.mock_keep(data_loc, 200) as keep_mock:
             self.assertEqual(". {} 0:6:six\n".format(data_loc),
     def test_open_flush(self):
         client = self.api_client_mock()
-        data_loc1 = hashlib.md5('flush1').hexdigest() + '+6'
-        data_loc2 = hashlib.md5('flush2').hexdigest() + '+6'
+        data_loc1 = tutil.str_keep_locator('flush1')
+        data_loc2 = tutil.str_keep_locator('flush2')
         with self.mock_keep((data_loc1, 200), (data_loc2, 200)) as keep_mock:
             writer = arvados.CollectionWriter(client)
             with writer.open('flush_test') as out_file:
@@ -777,15 +776,15 @@ class CollectionWriterTestCase(unittest.TestCase, CollectionTestMixin):
         with writer.open('.', '2') as out_file:
-        data_loc = hashlib.md5('1st2nd').hexdigest() + '+6'
+        data_loc = tutil.str_keep_locator('1st2nd')
         with self.mock_keep(data_loc, 200) as keep_mock:
             self.assertEqual(". {} 0:3:1 3:3:2\n".format(data_loc),
     def test_two_opens_two_streams(self):
         client = self.api_client_mock()
-        data_loc1 = hashlib.md5('file').hexdigest() + '+4'
-        data_loc2 = hashlib.md5('indir').hexdigest() + '+5'
+        data_loc1 = tutil.str_keep_locator('file')
+        data_loc2 = tutil.str_keep_locator('indir')
         with self.mock_keep((data_loc1, 200), (data_loc2, 200)) as keep_mock:
             writer = arvados.CollectionWriter(client)
             with writer.open('file') as out_file:
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index 9046892..ab57ed5 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -368,7 +368,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
     def test_put_error_does_not_include_successful_puts(self):
         data = 'partial failure test'
-        data_loc = '{}+{}'.format(hashlib.md5(data).hexdigest(), len(data))
+        data_loc = tutil.str_keep_locator(data)
         api_client = self.mock_keep_services(count=3)
         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
@@ -378,7 +378,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
     def test_proxy_put_with_no_writable_services(self):
         data = 'test with no writable services'
-        data_loc = '{}+{}'.format(hashlib.md5(data).hexdigest(), len(data))
+        data_loc = tutil.str_keep_locator(data)
         api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
@@ -518,7 +518,7 @@ class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
     def check_64_zeros_error_order(self, verb, exc_class):
         data = '0' * 64
         if verb == 'get':
-            data = hashlib.md5(data).hexdigest() + '+1234'
+            data = tutil.str_keep_locator(data)
         # Arbitrary port number:
         aport = random.randint(1024,65535)
         api_client = self.mock_keep_services(service_port=aport, count=self.services)



More information about the arvados-commits mailing list