[ARVADOS] created: dd5fd1e84119fe1b5bd120e17d77448917c4e277
git at public.curoverse.com
git at public.curoverse.com
Wed Nov 11 17:36:59 EST 2015
at dd5fd1e84119fe1b5bd120e17d77448917c4e277 (commit)
commit dd5fd1e84119fe1b5bd120e17d77448917c4e277
Author: Brett Smith <brett at curoverse.com>
Date: Wed Nov 11 17:08:39 2015 -0500
7696: Improve PySDK KeepClient.ThreadLimiter.
* Move the calculation of how many threads to allow into the class.
* Teach it to handle cases where max_replicas_per_service is known and
greater than 1. This will never happen today, but is an anticipated
improvement.
* Update docstrings to reflect current reality.
These are all changes I made while debugging the previous race
condition.
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 2b718d7..c297737 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -2,6 +2,7 @@ import cStringIO
import datetime
import hashlib
import logging
+import math
import os
import pycurl
import Queue
@@ -219,27 +220,34 @@ class KeepClient(object):
DEFAULT_PROXY_TIMEOUT = (20, 300)
class ThreadLimiter(object):
- """
- Limit the number of threads running at a given time to
- {desired successes} minus {successes reported}. When successes
- reported == desired, wake up the remaining threads and tell
- them to quit.
+ """Limit the number of threads writing to Keep at once.
+
+ This ensures that only a number of writer threads that could
+ potentially achieve the desired replication level run at once.
+ Once the desired replication level is achieved, queued threads
+ are instructed not to run.
Should be used in a "with" block.
"""
- def __init__(self, todo):
+ def __init__(self, want_copies, max_service_replicas):
self._started = 0
- self._todo = todo
+ self._want_copies = want_copies
self._done = 0
self._response = None
self._start_lock = threading.Condition()
- self._todo_lock = threading.Semaphore(todo)
+ if (not max_service_replicas) or (max_service_replicas >= want_copies):
+ max_threads = 1
+ else:
+ max_threads = math.ceil(float(want_copies) / max_service_replicas)
+ _logger.debug("Limiter max threads is %d", max_threads)
+ self._todo_lock = threading.Semaphore(max_threads)
self._done_lock = threading.Lock()
self._local = threading.local()
+ self._local.sequence = None
def __enter__(self):
self._start_lock.acquire()
- if getattr(self._local, 'sequence', None) is not None:
+ if self._local.sequence is not None:
# If the calling thread has used set_sequence(N), then
# we wait here until N other threads have started.
while self._started < self._local.sequence:
@@ -258,34 +266,28 @@ class KeepClient(object):
def shall_i_proceed(self):
"""
- Return true if the current thread should do stuff. Return
- false if the current thread should just stop.
+ Return true if the current thread should write to Keep.
+ Return false otherwise.
"""
with self._done_lock:
- return (self._done < self._todo)
+ return (self._done < self._want_copies)
def save_response(self, response_body, replicas_stored):
"""
Records a response body (a locator, possibly signed) returned by
- the Keep server. It is not necessary to save more than
- one response, since we presume that any locator returned
- in response to a successful request is valid.
+ the Keep server, and the number of replicas it stored.
"""
with self._done_lock:
self._done += replicas_stored
self._response = response_body
def response(self):
- """
- Returns the body from the response to a PUT request.
- """
+ """Return the body from the response to a PUT request."""
with self._done_lock:
return self._response
def done(self):
- """
- Return how many successes were reported.
- """
+ """Return the total number of replicas successfully stored."""
with self._done_lock:
return self._done
@@ -965,7 +967,8 @@ class KeepClient(object):
loop.save_result(error)
continue
- thread_limiter = KeepClient.ThreadLimiter(1 if self.max_replicas_per_service is None else copies)
+ thread_limiter = KeepClient.ThreadLimiter(
+ copies, self.max_replicas_per_service)
threads = []
for service_root, ks in [(root, roots_map[root])
for root in sorted_roots]:
commit 49ecbe5e9fc5b8793b8f3e7615007d82804f963c
Author: Brett Smith <brett at curoverse.com>
Date: Wed Nov 11 16:50:18 2015 -0500
7696: PySDK determines max_replicas_per_service after querying services.
Because max_replicas_per_service was set to 1 in the case where
KeepClient was instantiated with no direct information about available
Keep services, and because ThreadLimiter was being instantiated before
querying available Keep services (via map_new_services), the first
Keep request to talk to non-disk services would let multiple threads
run at once. This fixes that race condition, and adds a test that was
triggering it semi-reliably.
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index b3d64a4..2b718d7 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -645,6 +645,7 @@ class KeepClient(object):
self.put = self.local_store_put
else:
self.num_retries = num_retries
+ self.max_replicas_per_service = None
if proxy:
if not proxy.endswith('/'):
proxy += '/'
@@ -658,7 +659,6 @@ class KeepClient(object):
self._writable_services = self._keep_services
self.using_proxy = True
self._static_services_list = True
- self.max_replicas_per_service = None
else:
# It's important to avoid instantiating an API client
# unless we actually need one, for testing's sake.
@@ -671,7 +671,6 @@ class KeepClient(object):
self._writable_services = None
self.using_proxy = None
self._static_services_list = False
- self.max_replicas_per_service = 1
def current_timeout(self, attempt_number):
"""Return the appropriate timeout to use for this client.
@@ -955,10 +954,8 @@ class KeepClient(object):
# Tell the proxy how many copies we want it to store
headers['X-Keep-Desired-Replication'] = str(copies)
roots_map = {}
- thread_limiter = KeepClient.ThreadLimiter(1 if self.max_replicas_per_service is None else copies)
loop = retry.RetryLoop(num_retries, self._check_loop_result,
backoff_start=2)
- thread_sequence = 0
for tries_left in loop:
try:
sorted_roots = self.map_new_services(
@@ -968,6 +965,7 @@ class KeepClient(object):
loop.save_result(error)
continue
+ thread_limiter = KeepClient.ThreadLimiter(1 if self.max_replicas_per_service is None else copies)
threads = []
for service_root, ks in [(root, roots_map[root])
for root in sorted_roots]:
@@ -980,10 +978,9 @@ class KeepClient(object):
service_root=service_root,
thread_limiter=thread_limiter,
timeout=self.current_timeout(num_retries-tries_left),
- thread_sequence=thread_sequence)
+ thread_sequence=len(threads))
t.start()
threads.append(t)
- thread_sequence += 1
for t in threads:
t.join()
loop.save_result((thread_limiter.done() >= copies, len(threads)))
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index a5d9925..ada0dac 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -404,6 +404,19 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
actual = keep_client.put(body, copies=1)
self.assertEqual(pdh, actual)
+ def test_oddball_service_writer_count(self):
+ body = 'oddball service writer count'
+ pdh = tutil.str_keep_locator(body)
+ api_client = self.mock_keep_services(service_type='fancynewblobstore',
+ count=4)
+ headers = {'x-keep-replicas-stored': 3}
+ with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
+ **headers) as req_mock:
+ keep_client = arvados.KeepClient(api_client=api_client)
+ actual = keep_client.put(body, copies=2)
+ self.assertEqual(pdh, actual)
+ self.assertEqual(1, req_mock.call_count)
+
@tutil.skip_sleep
class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
commit 4fd735476b69fd5e5d5c3162a752a8a5022d8d18
Author: Brett Smith <brett at curoverse.com>
Date: Wed Nov 11 12:17:46 2015 -0500
7696: PySDK KeepClient uses all service types.
Filter out gateway services from the list of usable services, rather
than selecting only disk and proxy types.
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index b3ccc3e..b3d64a4 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -652,12 +652,13 @@ class KeepClient(object):
self._gateway_services = {}
self._keep_services = [{
'uuid': 'proxy',
+ 'service_type': 'proxy',
'_service_root': proxy,
}]
self._writable_services = self._keep_services
self.using_proxy = True
self._static_services_list = True
- self.max_replicas_per_service = 1
+ self.max_replicas_per_service = None
else:
# It's important to avoid instantiating an API client
# unless we actually need one, for testing's sake.
@@ -687,6 +688,10 @@ class KeepClient(object):
t = self.proxy_timeout if self.using_proxy else self.timeout
return (t[0] * (1 << attempt_number), t[1])
+ def _any_nondisk_services(self, service_list):
+ return any(ks.get('service_type', 'disk') != 'disk'
+ for ks in service_list)
+
def build_services_list(self, force_rebuild=False):
if (self._static_services_list or
(self._keep_services and not force_rebuild)):
@@ -697,12 +702,16 @@ class KeepClient(object):
except Exception: # API server predates Keep services.
keep_services = self.api_client.keep_disks().list()
- accessible = keep_services.execute().get('items')
- if not accessible:
+ # Gateway services are only used when specified by UUID,
+ # so there's nothing to gain by filtering them by
+ # service_type.
+ self._gateway_services = {ks['uuid']: ks for ks in
+ keep_services.execute()['items']}
+ if not self._gateway_services:
raise arvados.errors.NoKeepServersError()
# Precompute the base URI for each service.
- for r in accessible:
+ for r in self._gateway_services.itervalues():
host = r['service_host']
if not host.startswith('[') and host.find(':') >= 0:
# IPv6 URIs must be formatted like http://[::1]:80/...
@@ -712,27 +721,19 @@ class KeepClient(object):
host,
r['service_port'])
- # Gateway services are only used when specified by UUID,
- # so there's nothing to gain by filtering them by
- # service_type.
- self._gateway_services = {ks.get('uuid'): ks for ks in accessible}
_logger.debug(str(self._gateway_services))
-
self._keep_services = [
- ks for ks in accessible
- if ks.get('service_type') in ['disk', 'proxy']]
- self._writable_services = [
- ks for ks in accessible
- if (ks.get('service_type') in ['disk', 'proxy']) and (True != ks.get('read_only'))]
- _logger.debug(str(self._keep_services))
-
- self.using_proxy = any(ks.get('service_type') == 'proxy'
- for ks in self._keep_services)
+ ks for ks in self._gateway_services.itervalues()
+ if not ks.get('service_type', '').startswith('gateway:')]
+ self._writable_services = [ks for ks in self._keep_services
+ if not ks.get('read_only')]
+
# For disk type services, max_replicas_per_service is 1
- # It is unknown or unlimited for non-disk typed services.
- for ks in accessible:
- if ('disk' != ks.get('service_type')) and (not ks.get('read_only')):
- self.max_replicas_per_service = None
+ # It is unknown (unlimited) for other service types.
+ if self._any_nondisk_services(self._writable_services):
+ self.max_replicas_per_service = None
+ else:
+ self.max_replicas_per_service = 1
def _service_weight(self, data_hash, service_uuid):
"""Compute the weight of a Keep service endpoint for a data
@@ -769,7 +770,8 @@ class KeepClient(object):
# in that order.
use_services = self._keep_services
if need_writable:
- use_services = self._writable_services
+ use_services = self._writable_services
+ self.using_proxy = self._any_nondisk_services(use_services)
sorted_roots.extend([
svc['_service_root'] for svc in sorted(
use_services,
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index ab57ed5..a5d9925 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -387,6 +387,23 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
self.assertEqual(0, len(exc_check.exception.request_errors()))
+ def test_oddball_service_get(self):
+ body = 'oddball service get'
+ api_client = self.mock_keep_services(service_type='fancynewblobstore')
+ with tutil.mock_keep_responses(body, 200):
+ keep_client = arvados.KeepClient(api_client=api_client)
+ actual = keep_client.get(tutil.str_keep_locator(body))
+ self.assertEqual(body, actual)
+
+ def test_oddball_service_put(self):
+ body = 'oddball service put'
+ pdh = tutil.str_keep_locator(body)
+ api_client = self.mock_keep_services(service_type='fancynewblobstore')
+ with tutil.mock_keep_responses(pdh, 200):
+ keep_client = arvados.KeepClient(api_client=api_client)
+ actual = keep_client.put(body, copies=1)
+ self.assertEqual(pdh, actual)
+
@tutil.skip_sleep
class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
commit 0bf65e8de6c9b2986e2b2d968afe3aff70d390fc
Author: Brett Smith <brett at curoverse.com>
Date: Wed Nov 11 12:18:46 2015 -0500
7696: Clean imports in PySDK arvados.keep module.
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 8ed86fd..b3ccc3e 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -1,28 +1,15 @@
-import bz2
+import cStringIO
import datetime
-import fcntl
-import functools
-import gflags
import hashlib
-import json
import logging
import os
-import pprint
import pycurl
import Queue
import re
import socket
import ssl
-import string
-import cStringIO
-import subprocess
-import sys
import threading
-import time
import timer
-import types
-import UserDict
-import zlib
import arvados
import arvados.config as config
commit 113d76e501d66423aa0ce442f0331aca36fd3726
Author: Brett Smith <brett at curoverse.com>
Date: Wed Nov 11 10:06:51 2015 -0500
7696: Refactor locator builder method in PySDK tests.
diff --git a/sdk/python/tests/arvados_testutil.py b/sdk/python/tests/arvados_testutil.py
index ea318c6..b2cf436 100644
--- a/sdk/python/tests/arvados_testutil.py
+++ b/sdk/python/tests/arvados_testutil.py
@@ -47,6 +47,8 @@ def mock_api_responses(api_client, body, codes, headers={}):
return mock.patch.object(api_client._http, 'request', side_effect=queue_with((
(fake_httplib2_response(code, **headers), body) for code in codes)))
+def str_keep_locator(s):
+ return '{}+{}'.format(hashlib.md5(s).hexdigest(), len(s))
class FakeCurl:
@classmethod
@@ -126,8 +128,7 @@ class MockStreamReader(object):
def __init__(self, name='.', *data):
self._name = name
self._data = ''.join(data)
- self._data_locators = ['{}+{}'.format(hashlib.md5(d).hexdigest(),
- len(d)) for d in data]
+ self._data_locators = [str_keep_locator(d) for d in data]
self.num_retries = 0
def name(self):
diff --git a/sdk/python/tests/manifest_examples.py b/sdk/python/tests/manifest_examples.py
index 2d8e475..8f0abd2 100644
--- a/sdk/python/tests/manifest_examples.py
+++ b/sdk/python/tests/manifest_examples.py
@@ -1,6 +1,5 @@
import arvados
import arvados_testutil as tutil
-import hashlib
class ManifestExamples(object):
def make_manifest(self,
@@ -9,8 +8,7 @@ class ManifestExamples(object):
files_per_stream=1,
streams=1):
datablip = 'x' * bytes_per_block
- data_loc = '{}+{}'.format(hashlib.md5(datablip).hexdigest(),
- bytes_per_block)
+ data_loc = tutil.str_keep_locator(datablip)
with tutil.mock_keep_responses(data_loc, 200):
coll = arvados.CollectionWriter()
for si in range(0, streams):
diff --git a/sdk/python/tests/test_arv_ls.py b/sdk/python/tests/test_arv_ls.py
index 90bbacf..664b57f 100644
--- a/sdk/python/tests/test_arv_ls.py
+++ b/sdk/python/tests/test_arv_ls.py
@@ -1,7 +1,6 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
-import hashlib
import io
import random
@@ -11,6 +10,8 @@ import arvados.errors as arv_error
import arvados.commands.ls as arv_ls
import run_test_server
+from arvados_testutil import str_keep_locator
+
class ArvLsTestCase(run_test_server.TestCaseWithServers):
FAKE_UUID = 'zzzzz-4zz18-12345abcde12345'
@@ -24,8 +25,7 @@ class ArvLsTestCase(run_test_server.TestCaseWithServers):
def mock_api_for_manifest(self, manifest_lines, uuid=FAKE_UUID):
manifest_text = self.newline_join(manifest_lines)
- pdh = '{}+{}'.format(hashlib.md5(manifest_text).hexdigest(),
- len(manifest_text))
+ pdh = str_keep_locator(manifest_text)
coll_info = {'uuid': uuid,
'portable_data_hash': pdh,
'manifest_text': manifest_text}
diff --git a/sdk/python/tests/test_arvfile.py b/sdk/python/tests/test_arvfile.py
index 330dd44..ea86614 100644
--- a/sdk/python/tests/test_arvfile.py
+++ b/sdk/python/tests/test_arvfile.py
@@ -6,7 +6,6 @@ import io
import mock
import os
import unittest
-import hashlib
import time
import arvados
@@ -30,7 +29,7 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
self.requests.append(locator)
return self.blocks.get(locator)
def put(self, data, num_retries=None):
- pdh = "%s+%i" % (hashlib.md5(data).hexdigest(), len(data))
+ pdh = tutil.str_keep_locator(data)
self.blocks[pdh] = str(data)
return pdh
@@ -453,7 +452,7 @@ class ArvadosFileReaderTestCase(StreamFileReaderTestCase):
n = 0
blocks = {}
for d in ['01234', '34567', '67890']:
- loc = '{}+{}'.format(hashlib.md5(d).hexdigest(), len(d))
+ loc = tutil.str_keep_locator(d)
blocks[loc] = d
stream.append(Range(loc, n, len(d)))
n += len(d)
diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py
index ac7dd1b..bc3c0bf 100644
--- a/sdk/python/tests/test_collections.py
+++ b/sdk/python/tests/test_collections.py
@@ -4,7 +4,6 @@
import arvados
import copy
-import hashlib
import mock
import os
import pprint
@@ -395,7 +394,7 @@ class ArvadosCollectionsTest(run_test_server.TestCaseWithServers,
def test_write_directory_tree_with_zero_recursion(self):
cwriter = arvados.CollectionWriter(self.api_client)
content = 'd1/d2/f3d1/f2f1'
- blockhash = hashlib.md5(content).hexdigest() + '+' + str(len(content))
+ blockhash = tutil.str_keep_locator(content)
cwriter.write_directory_tree(
self.build_directory_tree(['f1', 'd1/f2', 'd1/d2/f3']),
max_manifest_depth=0)
@@ -739,7 +738,7 @@ class CollectionWriterTestCase(unittest.TestCase, CollectionTestMixin):
self.assertEqual('.', writer.current_stream_name())
self.assertEqual('out', writer.current_file_name())
out_file.write('test data')
- data_loc = hashlib.md5('test data').hexdigest() + '+9'
+ data_loc = tutil.str_keep_locator('test data')
self.assertTrue(out_file.closed, "writer file not closed after context")
self.assertRaises(ValueError, out_file.write, 'extra text')
with self.mock_keep(data_loc, 200) as keep_mock:
@@ -751,15 +750,15 @@ class CollectionWriterTestCase(unittest.TestCase, CollectionTestMixin):
writer = arvados.CollectionWriter(client)
with writer.open('six') as out_file:
out_file.writelines(['12', '34', '56'])
- data_loc = hashlib.md5('123456').hexdigest() + '+6'
+ data_loc = tutil.str_keep_locator('123456')
with self.mock_keep(data_loc, 200) as keep_mock:
self.assertEqual(". {} 0:6:six\n".format(data_loc),
writer.manifest_text())
def test_open_flush(self):
client = self.api_client_mock()
- data_loc1 = hashlib.md5('flush1').hexdigest() + '+6'
- data_loc2 = hashlib.md5('flush2').hexdigest() + '+6'
+ data_loc1 = tutil.str_keep_locator('flush1')
+ data_loc2 = tutil.str_keep_locator('flush2')
with self.mock_keep((data_loc1, 200), (data_loc2, 200)) as keep_mock:
writer = arvados.CollectionWriter(client)
with writer.open('flush_test') as out_file:
@@ -777,15 +776,15 @@ class CollectionWriterTestCase(unittest.TestCase, CollectionTestMixin):
out_file.write('1st')
with writer.open('.', '2') as out_file:
out_file.write('2nd')
- data_loc = hashlib.md5('1st2nd').hexdigest() + '+6'
+ data_loc = tutil.str_keep_locator('1st2nd')
with self.mock_keep(data_loc, 200) as keep_mock:
self.assertEqual(". {} 0:3:1 3:3:2\n".format(data_loc),
writer.manifest_text())
def test_two_opens_two_streams(self):
client = self.api_client_mock()
- data_loc1 = hashlib.md5('file').hexdigest() + '+4'
- data_loc2 = hashlib.md5('indir').hexdigest() + '+5'
+ data_loc1 = tutil.str_keep_locator('file')
+ data_loc2 = tutil.str_keep_locator('indir')
with self.mock_keep((data_loc1, 200), (data_loc2, 200)) as keep_mock:
writer = arvados.CollectionWriter(client)
with writer.open('file') as out_file:
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index 9046892..ab57ed5 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -368,7 +368,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
def test_put_error_does_not_include_successful_puts(self):
data = 'partial failure test'
- data_loc = '{}+{}'.format(hashlib.md5(data).hexdigest(), len(data))
+ data_loc = tutil.str_keep_locator(data)
api_client = self.mock_keep_services(count=3)
with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
@@ -378,7 +378,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
def test_proxy_put_with_no_writable_services(self):
data = 'test with no writable services'
- data_loc = '{}+{}'.format(hashlib.md5(data).hexdigest(), len(data))
+ data_loc = tutil.str_keep_locator(data)
api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
@@ -518,7 +518,7 @@ class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
def check_64_zeros_error_order(self, verb, exc_class):
data = '0' * 64
if verb == 'get':
- data = hashlib.md5(data).hexdigest() + '+1234'
+ data = tutil.str_keep_locator(data)
# Arbitrary port number:
aport = random.randint(1024,65535)
api_client = self.mock_keep_services(service_port=aport, count=self.services)
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list