[ARVADOS] created: e61de9add861db4c043341f3926acb95ded37862
git at public.curoverse.com
git at public.curoverse.com
Tue Nov 11 16:31:04 EST 2014
at e61de9add861db4c043341f3926acb95ded37862 (commit)
commit e61de9add861db4c043341f3926acb95ded37862
Author: Tom Clegg <tom at curoverse.com>
Date: Tue Nov 11 16:30:58 2014 -0500
2853: Add comment.
diff --git a/sdk/go/keepclient/root_sorter_test.go b/sdk/go/keepclient/root_sorter_test.go
index 0d45e90..7fe6a70 100644
--- a/sdk/go/keepclient/root_sorter_test.go
+++ b/sdk/go/keepclient/root_sorter_test.go
@@ -43,6 +43,8 @@ func (*RootSorterSuite) JustOneRoot(c *C) {
func (*RootSorterSuite) ReferenceSet(c *C) {
fakeroots := FakeServiceRoots(16)
+ // These reference probe orders are explained further in
+ // ../../python/arvados/keep.py:
expected_orders := []string{
"3eab2d5fc9681074",
"097dba52e648f1c3",
commit 1237511f9da5ee20588d44be4db5f9e37cfc6400
Author: Tom Clegg <tom at curoverse.com>
Date: Tue Nov 11 16:20:12 2014 -0500
2853: Use the same keep_service UUIDs so tests behave reliably.
diff --git a/sdk/go/keepclient/keepclient_test.go b/sdk/go/keepclient/keepclient_test.go
index 608e714..f02f982 100644
--- a/sdk/go/keepclient/keepclient_test.go
+++ b/sdk/go/keepclient/keepclient_test.go
@@ -617,12 +617,13 @@ func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
os.Setenv("ARVADOS_API_HOST", "localhost:3000")
os.Setenv("ARVADOS_API_TOKEN", "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
+ content := []byte("TestPutGetHead")
arv, err := arvadosclient.MakeArvadosClient()
kc, err := MakeKeepClient(&arv)
c.Assert(err, Equals, nil)
- hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+ hash := fmt.Sprintf("%x", md5.Sum(content))
{
n, _, err := kc.Ask(hash)
@@ -630,25 +631,25 @@ func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
c.Check(n, Equals, int64(0))
}
{
- hash2, replicas, err := kc.PutB([]byte("foo"))
- c.Check(hash2, Equals, fmt.Sprintf("%s+%v", hash, 3))
+ hash2, replicas, err := kc.PutB(content)
+ c.Check(hash2, Equals, fmt.Sprintf("%s+%d", hash, len(content)))
c.Check(replicas, Equals, 2)
c.Check(err, Equals, nil)
}
{
r, n, url2, err := kc.Get(hash)
c.Check(err, Equals, nil)
- c.Check(n, Equals, int64(3))
+ c.Check(n, Equals, int64(len(content)))
c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash))
- content, err2 := ioutil.ReadAll(r)
+ read_content, err2 := ioutil.ReadAll(r)
c.Check(err2, Equals, nil)
- c.Check(content, DeepEquals, []byte("foo"))
+ c.Check(read_content, DeepEquals, content)
}
{
n, url2, err := kc.Ask(hash)
c.Check(err, Equals, nil)
- c.Check(n, Equals, int64(3))
+ c.Check(n, Equals, int64(len(content)))
c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash))
}
}
diff --git a/sdk/python/tests/run_test_server.py b/sdk/python/tests/run_test_server.py
index 1624402..739c754 100644
--- a/sdk/python/tests/run_test_server.py
+++ b/sdk/python/tests/run_test_server.py
@@ -178,8 +178,18 @@ def run_keep(blob_signing_key=None, enforce_permissions=False):
for d in api.keep_disks().list().execute()['items']:
api.keep_disks().delete(uuid=d['uuid']).execute()
- s1 = api.keep_services().create(body={"keep_service": {"service_host": "localhost", "service_port": 25107, "service_type": "disk"} }).execute()
- s2 = api.keep_services().create(body={"keep_service": {"service_host": "localhost", "service_port": 25108, "service_type": "disk"} }).execute()
+ s1 = api.keep_services().create(body={"keep_service": {
+ "uuid": "zzzzz-bi6l4-5bo5n1iekkjyz6b",
+ "service_host": "localhost",
+ "service_port": 25107,
+ "service_type": "disk"
+ }}).execute()
+ s2 = api.keep_services().create(body={"keep_service": {
+ "uuid": "zzzzz-bi6l4-2nz60e0ksj7vr3s",
+ "service_host": "localhost",
+ "service_port": 25108,
+ "service_type": "disk"
+ }}).execute()
api.keep_disks().create(body={"keep_disk": {"keep_service_uuid": s1["uuid"] } }).execute()
api.keep_disks().create(body={"keep_disk": {"keep_service_uuid": s2["uuid"] } }).execute()
commit fa3e996ee452bd9be853dd9e93aaec15623708f5
Author: Tom Clegg <tom at curoverse.com>
Date: Tue Nov 11 15:02:07 2014 -0500
2853: Update tests to survive ServiceRoots being a map[string]string.
diff --git a/services/keepproxy/keepproxy_test.go b/services/keepproxy/keepproxy_test.go
index f6d163c..88ac8a6 100644
--- a/services/keepproxy/keepproxy_test.go
+++ b/services/keepproxy/keepproxy_test.go
@@ -143,11 +143,14 @@ func runProxy(c *C, args []string, token string, port int) keepclient.KeepClient
os.Setenv("ARVADOS_KEEP_PROXY", fmt.Sprintf("http://localhost:%v", port))
os.Setenv("ARVADOS_API_TOKEN", token)
arv, err := arvadosclient.MakeArvadosClient()
+ c.Assert(err, Equals, nil)
kc, err := keepclient.MakeKeepClient(&arv)
+ c.Assert(err, Equals, nil)
c.Check(kc.Using_proxy, Equals, true)
c.Check(len(kc.ServiceRoots()), Equals, 1)
- c.Check(kc.ServiceRoots()[0], Equals, fmt.Sprintf("http://localhost:%v", port))
- c.Check(err, Equals, nil)
+ for _, root := range(kc.ServiceRoots()) {
+ c.Check(root, Equals, fmt.Sprintf("http://localhost:%v", port))
+ }
os.Setenv("ARVADOS_KEEP_PROXY", "")
log.Print("keepclient created")
return kc
@@ -165,12 +168,15 @@ func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
os.Setenv("ARVADOS_EXTERNAL_CLIENT", "true")
arv, err := arvadosclient.MakeArvadosClient()
+ c.Assert(err, Equals, nil)
kc, err := keepclient.MakeKeepClient(&arv)
+ c.Assert(err, Equals, nil)
c.Check(kc.Arvados.External, Equals, true)
c.Check(kc.Using_proxy, Equals, true)
c.Check(len(kc.ServiceRoots()), Equals, 1)
- c.Check(kc.ServiceRoots()[0], Equals, "http://localhost:29950")
- c.Check(err, Equals, nil)
+ for _, root := range kc.ServiceRoots() {
+ c.Check(root, Equals, "http://localhost:29950")
+ }
os.Setenv("ARVADOS_EXTERNAL_CLIENT", "")
log.Print("keepclient created")
commit 1304e044aa87a65145bf8b6d4bc141586556c0ed
Author: Tom Clegg <tom at curoverse.com>
Date: Tue Nov 11 14:31:49 2014 -0500
2853: Add tests for reference set and some edge cases.
diff --git a/sdk/go/keepclient/root_sorter_test.go b/sdk/go/keepclient/root_sorter_test.go
new file mode 100644
index 0000000..0d45e90
--- /dev/null
+++ b/sdk/go/keepclient/root_sorter_test.go
@@ -0,0 +1,61 @@
+package keepclient
+
+import (
+ "crypto/md5"
+ "fmt"
+ . "gopkg.in/check.v1"
+ "strconv"
+ "strings"
+)
+
+type RootSorterSuite struct{}
+var _ = Suite(&RootSorterSuite{})
+
+func FakeSvcRoot(i uint64) (string) {
+ return fmt.Sprintf("https://%x.svc/", i)
+}
+
+func FakeSvcUuid(i uint64) (string) {
+ return fmt.Sprintf("zzzzz-bi6l4-%015x", i)
+}
+
+func FakeServiceRoots(n uint64) (map[string]string) {
+ sr := map[string]string{}
+ for i := uint64(0); i < n; i ++ {
+ sr[FakeSvcUuid(i)] = FakeSvcRoot(i)
+ }
+ return sr
+}
+
+func Md5String(data string) (string) {
+ return fmt.Sprintf("%032x", md5.Sum([]byte(data)))
+}
+
+func (*RootSorterSuite) EmptyRoots(c *C) {
+ rs := NewRootSorter(map[string]string{}, Md5String("foo"))
+ c.Check(rs.GetSortedRoots(), Equals, []string{})
+}
+
+func (*RootSorterSuite) JustOneRoot(c *C) {
+ rs := NewRootSorter(FakeServiceRoots(1), Md5String("foo"))
+ c.Check(rs.GetSortedRoots(), Equals, []string{FakeSvcRoot(0)})
+}
+
+func (*RootSorterSuite) ReferenceSet(c *C) {
+ fakeroots := FakeServiceRoots(16)
+ expected_orders := []string{
+ "3eab2d5fc9681074",
+ "097dba52e648f1c3",
+ "c5b4e023f8a7d691",
+ "9d81c02e76a3bf54",
+ }
+ for h, expected_order := range expected_orders {
+ hash := Md5String(fmt.Sprintf("%064x", h))
+ roots := NewRootSorter(fakeroots, hash).GetSortedRoots()
+ for i, svc_id_s := range strings.Split(expected_order, "") {
+ svc_id, err := strconv.ParseUint(svc_id_s, 16, 64)
+ c.Assert(err, Equals, nil)
+ c.Check(roots[i], Equals, FakeSvcRoot(svc_id))
+ }
+ }
+}
commit 47b0cb35b5ee933757c4342bb75fd286c1dac8cb
Author: Tom Clegg <tom at curoverse.com>
Date: Tue Nov 11 13:57:15 2014 -0500
2853: Add "reference set" test to check probe order agreement between implementations.
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index 2cb0317..72bc741 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -1,6 +1,7 @@
import hashlib
import mock
import os
+import re
import socket
import unittest
import urlparse
@@ -21,11 +22,35 @@ class KeepRendezvousWeightTestCase(unittest.TestCase):
def addServices(self, n):
for x in range(n):
uuid = "zzzzz-bi6l4-{:015x}".format(self.n_services)
- uri = "https://[0.0.0.{}]:25107/".format(self.n_services)
+ uri = "https://keep0x{:x}.zzzzz.arvadosapi.com:25107/".format(self.n_services)
self.keep_client._keep_services.append(
{'uuid': uuid, '_service_root': uri})
self.n_services += 1
+ def test_ProbeOrderReferenceSet(self):
+ # expected_order[i] is the probe order for
+ # hash=md5(sprintf("%064x",i)) where there are 16 services
+ # with uuid sprintf("anything-%015x",j) with j in 0..15. E.g.,
+ # the first probe for the block consisting of 64 "0"
+ # characters is the service whose uuid is
+ # "zzzzz-bi6l4-000000000000003", so expected_order[0][0]=='3'.
+ expected_order = [
+ list('3eab2d5fc9681074'),
+ list('097dba52e648f1c3'),
+ list('c5b4e023f8a7d691'),
+ list('9d81c02e76a3bf54'),
+ ]
+ hashes = [
+ hashlib.md5("{:064x}".format(x)).hexdigest()
+ for x in range(len(expected_order))]
+ self.addServices(16)
+ for i, hash in enumerate(hashes):
+ roots = self.keep_client.weighted_service_roots(hash)
+ got_order = [
+ re.search(r'//keep0x([0-9a-f]+)', root).group(1)
+ for root in roots]
+ self.assertEqual(got_order, expected_order[i])
+
def test_ProbeWasteAddingOneServer(self):
hashes = [
hashlib.md5("{:064x}".format(x)).hexdigest() for x in range(100)]
commit e02ef893c4f6cf881e449c248782b2ac21b49b0f
Author: Tom Clegg <tom at curoverse.com>
Date: Tue Nov 11 12:44:02 2014 -0500
2853: Fix instance ID in test fixture hostnames. Clean up brittle tests.
diff --git a/services/api/test/fixtures/keep_services.yml b/services/api/test/fixtures/keep_services.yml
index 84ac316..f668cbc 100644
--- a/services/api/test/fixtures/keep_services.yml
+++ b/services/api/test/fixtures/keep_services.yml
@@ -1,7 +1,7 @@
keep0:
uuid: zzzzz-bi6l4-6zhilxar6r8ey90
owner_uuid: zzzzz-tpzed-d9tiejq69daie8f
- service_host: keep0.qr1hi.arvadosapi.com
+ service_host: keep0.zzzzz.arvadosapi.com
service_port: 25107
service_ssl_flag: false
service_type: disk
@@ -9,7 +9,7 @@ keep0:
keep1:
uuid: zzzzz-bi6l4-rsnj3c76ndxb7o0
owner_uuid: zzzzz-tpzed-d9tiejq69daie8f
- service_host: keep1.qr1hi.arvadosapi.com
+ service_host: keep1.zzzzz.arvadosapi.com
service_port: 25107
service_ssl_flag: false
service_type: disk
@@ -17,7 +17,7 @@ keep1:
proxy:
uuid: zzzzz-bi6l4-h0a0xwut9qa6g3a
owner_uuid: zzzzz-tpzed-d9tiejq69daie8f
- service_host: keep.qr1hi.arvadosapi.com
+ service_host: keep.zzzzz.arvadosapi.com
service_port: 25333
service_ssl_flag: true
service_type: proxy
diff --git a/services/api/test/integration/keep_proxy_test.rb b/services/api/test/integration/keep_proxy_test.rb
index d4155c2..aacda51 100644
--- a/services/api/test/integration/keep_proxy_test.rb
+++ b/services/api/test/integration/keep_proxy_test.rb
@@ -6,20 +6,23 @@ class KeepProxyTest < ActionDispatch::IntegrationTest
assert_response :success
services = json_response['items']
- assert_equal 2, services.length
- assert_equal 'disk', services[0]['service_type']
- assert_equal 'disk', services[1]['service_type']
+ assert_operator 2, :<=, services.length
+ services.each do |service|
+ assert_equal 'disk', service['service_type']
+ end
+ end
+ test "request keep proxy" do
get "/arvados/v1/keep_services/accessible", {:format => :json}, auth(:active).merge({'HTTP_X_EXTERNAL_CLIENT' => '1'})
assert_response :success
services = json_response['items']
assert_equal 1, services.length
- assert_equal "zzzzz-bi6l4-h0a0xwut9qa6g3a", services[0]['uuid']
- assert_equal "keep.qr1hi.arvadosapi.com", services[0]['service_host']
- assert_equal 25333, services[0]['service_port']
- assert_equal true, services[0]['service_ssl_flag']
+ assert_equal keep_services(:proxy).uuid, services[0]['uuid']
+ assert_equal keep_services(:proxy).service_host, services[0]['service_host']
+ assert_equal keep_services(:proxy).service_port, services[0]['service_port']
+ assert_equal keep_services(:proxy).service_ssl_flag, services[0]['service_ssl_flag']
assert_equal 'proxy', services[0]['service_type']
end
end
commit 9405b5f224b205e681e378bd2d90d18f3638139b
Author: Tom Clegg <tom at curoverse.com>
Date: Tue Nov 11 12:11:08 2014 -0500
2853: Use rendezvous hashing to select probe order in Python library.
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 46bd1cb..4c288f9 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -22,15 +22,15 @@ import datetime
import ssl
import socket
-_logger = logging.getLogger('arvados.keep')
-global_client_object = None
-
import arvados
import arvados.config as config
import arvados.errors
import arvados.retry as retry
import arvados.util
+_logger = logging.getLogger('arvados.keep')
+global_client_object = None
+
class KeepLocator(object):
EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9 at _-]+$')
@@ -289,7 +289,7 @@ class KeepClient(object):
def last_status(self):
try:
return int(self.last_result[0].status)
- except (AttributeError, IndexError, ValueError):
+ except (AttributeError, IndexError, ValueError, TypeError):
return None
def get(self, http, locator):
@@ -448,9 +448,12 @@ class KeepClient(object):
if not proxy.endswith('/'):
proxy += '/'
self.api_token = api_token
- self.service_roots = [proxy]
+ self._keep_services = [{
+ 'uuid': 'proxy',
+ '_service_root': proxy,
+ }]
self.using_proxy = True
- self.static_service_roots = True
+ self._static_services_list = True
else:
# It's important to avoid instantiating an API client
# unless we actually need one, for testing's sake.
@@ -458,13 +461,13 @@ class KeepClient(object):
api_client = arvados.api('v1')
self.api_client = api_client
self.api_token = api_client.api_token
- self.service_roots = None
+ self._keep_services = None
self.using_proxy = None
- self.static_service_roots = False
+ self._static_services_list = False
- def build_service_roots(self, force_rebuild=False):
- if (self.static_service_roots or
- (self.service_roots and not force_rebuild)):
+ def build_services_list(self, force_rebuild=False):
+ if (self._static_services_list or
+ (self._keep_services and not force_rebuild)):
return
with self.lock:
try:
@@ -472,68 +475,47 @@ class KeepClient(object):
except Exception: # API server predates Keep services.
keep_services = self.api_client.keep_disks().list()
- keep_services = keep_services.execute().get('items')
- if not keep_services:
+ self._keep_services = keep_services.execute().get('items')
+ if not self._keep_services:
raise arvados.errors.NoKeepServersError()
self.using_proxy = any(ks.get('service_type') == 'proxy'
- for ks in keep_services)
-
- roots = ("{}://[{}]:{:d}/".format(
- 'https' if ks['service_ssl_flag'] else 'http',
- ks['service_host'],
- ks['service_port'])
- for ks in keep_services)
- self.service_roots = sorted(set(roots))
- _logger.debug(str(self.service_roots))
-
- def shuffled_service_roots(self, hash, force_rebuild=False):
- self.build_service_roots(force_rebuild)
-
- # Build an ordering with which to query the Keep servers based on the
- # contents of the hash.
- # "hash" is a hex-encoded number at least 8 digits
- # (32 bits) long
-
- # seed used to calculate the next keep server from 'pool'
- # to be added to 'pseq'
- seed = hash
-
- # Keep servers still to be added to the ordering
- pool = self.service_roots[:]
-
- # output probe sequence
- pseq = []
-
- # iterate while there are servers left to be assigned
- while len(pool) > 0:
- if len(seed) < 8:
- # ran out of digits in the seed
- if len(pseq) < len(hash) / 4:
- # the number of servers added to the probe sequence is less
- # than the number of 4-digit slices in 'hash' so refill the
- # seed with the last 4 digits and then append the contents
- # of 'hash'.
- seed = hash[-4:] + hash
- else:
- # refill the seed with the contents of 'hash'
- seed += hash
-
- # Take the next 8 digits (32 bytes) and interpret as an integer,
- # then modulus with the size of the remaining pool to get the next
- # selected server.
- probe = int(seed[0:8], 16) % len(pool)
-
- # Append the selected server to the probe sequence and remove it
- # from the pool.
- pseq += [pool[probe]]
- pool = pool[:probe] + pool[probe+1:]
-
- # Remove the digits just used from the seed
- seed = seed[8:]
- _logger.debug(str(pseq))
- return pseq
+ for ks in self._keep_services)
+
+ # Precompute the base URI for each service.
+ for r in self._keep_services:
+ r['_service_root'] = "{}://[{}]:{:d}/".format(
+ 'https' if r['service_ssl_flag'] else 'http',
+ r['service_host'],
+ r['service_port'])
+ _logger.debug(str(self._keep_services))
+
+ def _service_weight(self, hash, service_uuid):
+ """Compute the weight of a Keep service endpoint for a data
+ block with a known hash.
+
+ The weight is md5(h + u) where u is the last 15 characters of
+ the service endpoint's UUID.
+ """
+ return hashlib.md5(hash + service_uuid[-15:]).hexdigest()
+ def weighted_service_roots(self, hash, force_rebuild=False):
+ """Return an array of Keep service endpoints, in the order in
+ which they should be probed when reading or writing data with
+ the given hash.
+ """
+ self.build_services_list(force_rebuild)
+
+ # Sort the available services by weight (heaviest first) for
+ # this hash, and return their service_roots (base URIs) in
+ # that order.
+ sorted_roots = [
+ svc['_service_root'] for svc in sorted(
+ self._keep_services,
+ reverse=True,
+ key=lambda svc: self._service_weight(hash, svc['uuid']))]
+ _logger.debug(hash + ': ' + str(sorted_roots))
+ return sorted_roots
def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
# roots_map is a dictionary, mapping Keep service root strings
@@ -541,7 +523,7 @@ class KeepClient(object):
# new ones to roots_map. Return the current list of local
# root strings.
headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
- local_roots = self.shuffled_service_roots(md5_s, force_rebuild)
+ local_roots = self.weighted_service_roots(md5_s, force_rebuild)
for root in local_roots:
if root not in roots_map:
roots_map[root] = self.KeepService(root, **headers)
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index d07d6e1..2cb0317 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -1,3 +1,4 @@
+import hashlib
import mock
import os
import socket
@@ -9,6 +10,60 @@ import arvados.retry
import arvados_testutil as tutil
import run_test_server
+class KeepRendezvousWeightTestCase(unittest.TestCase):
+ def setUp(self):
+ self.keep_client = arvados.KeepClient(
+ api_client=mock.MagicMock(name='api_client'),
+ proxy='', local_store='')
+ self.keep_client._keep_services = []
+ self.n_services = 0
+
+ def addServices(self, n):
+ for x in range(n):
+ uuid = "zzzzz-bi6l4-{:015x}".format(self.n_services)
+ uri = "https://[0.0.0.{}]:25107/".format(self.n_services)
+ self.keep_client._keep_services.append(
+ {'uuid': uuid, '_service_root': uri})
+ self.n_services += 1
+
+ def test_ProbeWasteAddingOneServer(self):
+ hashes = [
+ hashlib.md5("{:064x}".format(x)).hexdigest() for x in range(100)]
+ initial_services = 12
+ self.addServices(initial_services)
+ probes_before = [
+ self.keep_client.weighted_service_roots(hash) for hash in hashes]
+ for added_services in range(1, 12):
+ self.addServices(1)
+ total_penalty = 0
+ for hash_index in range(len(hashes)):
+ probe_after = self.keep_client.weighted_service_roots(
+ hashes[hash_index])
+ penalty = probe_after.index(probes_before[hash_index][0])
+ self.assertLessEqual(penalty, added_services)
+ total_penalty += penalty
+ # Average penalty per block should not exceed
+ # N(added)/N(orig) by more than 20%, and should get closer
+ # to the ideal as we add data points.
+ expect_penalty = (
+ added_services *
+ len(hashes) / initial_services)
+ max_penalty = (
+ expect_penalty *
+ (120 - added_services)/100)
+ min_penalty = (
+ expect_penalty * 8/10)
+ self.assertTrue(
+ min_penalty <= total_penalty <= max_penalty,
+ "With {}+{} services, {} blocks, penalty {} but expected {}..{}".format(
+ initial_services,
+ added_services,
+ len(hashes),
+ total_penalty,
+ min_penalty,
+ max_penalty))
+
+
class KeepTestCase(run_test_server.TestCaseWithServers):
MAIN_SERVER = {}
KEEP_SERVER = {}
@@ -249,7 +304,7 @@ class KeepClientServiceTestCase(unittest.TestCase):
def get_service_roots(self, *services):
api_client = self.mock_keep_services(*services)
keep_client = arvados.KeepClient(api_client=api_client)
- services = keep_client.shuffled_service_roots('000000')
+ services = keep_client.weighted_service_roots('000000')
return [urlparse.urlparse(url) for url in sorted(services)]
def test_ssl_flag_respected_in_roots(self):
commit a4b55a80251d666ca39d0f8d201c70e493b4c661
Author: Tom Clegg <tom at curoverse.com>
Date: Tue Nov 11 05:14:21 2014 -0500
2853: Use rendezvous hashing to select probe order.
diff --git a/sdk/go/keepclient/keepclient.go b/sdk/go/keepclient/keepclient.go
index e1c25c9..326c2a0 100644
--- a/sdk/go/keepclient/keepclient.go
+++ b/sdk/go/keepclient/keepclient.go
@@ -12,7 +12,6 @@ import (
"log"
"net/http"
"regexp"
- "sort"
"strings"
"sync"
"sync/atomic"
@@ -36,7 +35,7 @@ type KeepClient struct {
Arvados *arvadosclient.ArvadosClient
Want_replicas int
Using_proxy bool
- service_roots *[]string
+ service_roots *map[string]string
lock sync.Mutex
Client *http.Client
}
@@ -133,7 +132,7 @@ func (this KeepClient) AuthorizedGet(hash string,
contentLength int64, url string, err error) {
// Calculate the ordering for asking servers
- sv := this.shuffledServiceRoots(hash)
+ sv := NewRootSorter(this.ServiceRoots(), hash).GetSortedRoots()
for _, host := range sv {
var req *http.Request
@@ -175,7 +174,7 @@ func (this KeepClient) Ask(hash string) (contentLength int64, url string, err er
func (this KeepClient) AuthorizedAsk(hash string, signature string,
timestamp string) (contentLength int64, url string, err error) {
// Calculate the ordering for asking servers
- sv := this.shuffledServiceRoots(hash)
+ sv := NewRootSorter(this.ServiceRoots(), hash).GetSortedRoots()
for _, host := range sv {
var req *http.Request
@@ -208,20 +207,19 @@ func (this KeepClient) AuthorizedAsk(hash string, signature string,
}
// Atomically read the service_roots field.
-func (this *KeepClient) ServiceRoots() []string {
- r := (*[]string)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots))))
+func (this *KeepClient) ServiceRoots() map[string]string {
+ r := (*map[string]string)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots))))
return *r
}
// Atomically update the service_roots field. Enables you to update
// service_roots without disrupting any GET or PUT operations that might
// already be in progress.
-func (this *KeepClient) SetServiceRoots(svc []string) {
- // Must be sorted for ShuffledServiceRoots() to produce consistent
- // results.
- roots := make([]string, len(svc))
- copy(roots, svc)
- sort.Strings(roots)
+func (this *KeepClient) SetServiceRoots(new_roots map[string]string) {
+ roots := make(map[string]string)
+ for uuid, root := range new_roots {
+ roots[uuid] = root
+ }
atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots)),
unsafe.Pointer(&roots))
}
diff --git a/sdk/go/keepclient/keepclient_test.go b/sdk/go/keepclient/keepclient_test.go
index 04be03b..608e714 100644
--- a/sdk/go/keepclient/keepclient_test.go
+++ b/sdk/go/keepclient/keepclient_test.go
@@ -87,20 +87,27 @@ func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
c.Assert(err, Equals, nil)
c.Check(len(kc.ServiceRoots()), Equals, 2)
- c.Check(kc.ServiceRoots()[0], Equals, "http://localhost:25107")
- c.Check(kc.ServiceRoots()[1], Equals, "http://localhost:25108")
+ for _, root := range kc.ServiceRoots() {
+ c.Check(root, Matches, "http://localhost:2510[\\d]")
+ }
}
func (s *StandaloneSuite) TestShuffleServiceRoots(c *C) {
+ roots := map[string]string{
+ "zzzzz-bi6l4-2q7dq8becevdqfb": "http://localhost:1",
+ "zzzzz-bi6l4-4gbhck2w7lq0d96": "http://localhost:2",
+ "zzzzz-bi6l4-4bt69dsk0quh7ae": "http://localhost:3",
+ "zzzzz-bi6l4-62w1fgd0ud2krxl": "http://localhost:4",
+ }
kc := KeepClient{}
- kc.SetServiceRoots([]string{"http://localhost:25107", "http://localhost:25108", "http://localhost:25109", "http://localhost:25110", "http://localhost:25111", "http://localhost:25112", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25116", "http://localhost:25117", "http://localhost:25118", "http://localhost:25119", "http://localhost:25120", "http://localhost:25121", "http://localhost:25122", "http://localhost:25123"})
+ kc.SetServiceRoots(roots)
// "foo" acbd18db4cc2f85cedef654fccc4a4d8
- foo_shuffle := []string{"http://localhost:25116", "http://localhost:25120", "http://localhost:25119", "http://localhost:25122", "http://localhost:25108", "http://localhost:25114", "http://localhost:25112", "http://localhost:25107", "http://localhost:25118", "http://localhost:25111", "http://localhost:25113", "http://localhost:25121", "http://localhost:25110", "http://localhost:25117", "http://localhost:25109", "http://localhost:25115", "http://localhost:25123"}
+ foo_shuffle := []string{"http://localhost:4", "http://localhost:1", "http://localhost:3", "http://localhost:2"}
c.Check(kc.shuffledServiceRoots("acbd18db4cc2f85cedef654fccc4a4d8"), DeepEquals, foo_shuffle)
// "bar" 37b51d194a7513e45b56f6524f2d51f2
- bar_shuffle := []string{"http://localhost:25108", "http://localhost:25112", "http://localhost:25119", "http://localhost:25107", "http://localhost:25110", "http://localhost:25116", "http://localhost:25122", "http://localhost:25120", "http://localhost:25121", "http://localhost:25117", "http://localhost:25111", "http://localhost:25123", "http://localhost:25118", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25109"}
+ bar_shuffle := []string{"http://localhost:3", "http://localhost:2", "http://localhost:4", "http://localhost:1"}
c.Check(kc.shuffledServiceRoots("37b51d194a7513e45b56f6524f2d51f2"), DeepEquals, bar_shuffle)
}
@@ -273,19 +280,19 @@ func (s *StandaloneSuite) TestPutB(c *C) {
hash,
"abc123",
"foo",
- make(chan string, 2)}
+ make(chan string, 5)}
arv, _ := arvadosclient.MakeArvadosClient()
kc, _ := MakeKeepClient(&arv)
kc.Want_replicas = 2
arv.ApiToken = "abc123"
- service_roots := make([]string, 5)
+ service_roots := make(map[string]string)
ks := RunSomeFakeKeepServers(st, 5, 2990)
for i := 0; i < len(ks); i += 1 {
- service_roots[i] = ks[i].url
+ service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = ks[i].url
defer ks[i].listener.Close()
}
@@ -315,19 +322,19 @@ func (s *StandaloneSuite) TestPutHR(c *C) {
hash,
"abc123",
"foo",
- make(chan string, 2)}
+ make(chan string, 5)}
arv, _ := arvadosclient.MakeArvadosClient()
kc, _ := MakeKeepClient(&arv)
kc.Want_replicas = 2
arv.ApiToken = "abc123"
- service_roots := make([]string, 5)
+ service_roots := make(map[string]string)
ks := RunSomeFakeKeepServers(st, 5, 2990)
for i := 0; i < len(ks); i += 1 {
- service_roots[i] = ks[i].url
+ service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = ks[i].url
defer ks[i].listener.Close()
}
@@ -366,7 +373,7 @@ func (s *StandaloneSuite) TestPutWithFail(c *C) {
hash,
"abc123",
"foo",
- make(chan string, 2)}
+ make(chan string, 4)}
fh := FailHandler{
make(chan string, 1)}
@@ -376,17 +383,17 @@ func (s *StandaloneSuite) TestPutWithFail(c *C) {
kc.Want_replicas = 2
arv.ApiToken = "abc123"
- service_roots := make([]string, 5)
+ service_roots := make(map[string]string)
ks1 := RunSomeFakeKeepServers(st, 4, 2990)
ks2 := RunSomeFakeKeepServers(fh, 1, 2995)
for i, k := range ks1 {
- service_roots[i] = k.url
+ service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
defer k.listener.Close()
}
for i, k := range ks2 {
- service_roots[len(ks1)+i] = k.url
+ service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
defer k.listener.Close()
}
@@ -425,29 +432,27 @@ func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
kc.Want_replicas = 2
arv.ApiToken = "abc123"
- service_roots := make([]string, 5)
+ service_roots := make(map[string]string)
ks1 := RunSomeFakeKeepServers(st, 1, 2990)
ks2 := RunSomeFakeKeepServers(fh, 4, 2991)
for i, k := range ks1 {
- service_roots[i] = k.url
+ service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
defer k.listener.Close()
}
for i, k := range ks2 {
- service_roots[len(ks1)+i] = k.url
+ service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
defer k.listener.Close()
}
kc.SetServiceRoots(service_roots)
- shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
-
_, replicas, err := kc.PutB([]byte("foo"))
c.Check(err, Equals, InsufficientReplicasError)
c.Check(replicas, Equals, 1)
- c.Check(<-st.handled, Equals, shuff[1])
+ c.Check(<-st.handled, Matches, ".*2990")
log.Printf("TestPutWithTooManyFail done")
}
@@ -483,7 +488,7 @@ func (s *StandaloneSuite) TestGet(c *C) {
arv, err := arvadosclient.MakeArvadosClient()
kc, _ := MakeKeepClient(&arv)
arv.ApiToken = "abc123"
- kc.SetServiceRoots([]string{url})
+ kc.SetServiceRoots(map[string]string{"x":url})
r, n, url2, err := kc.Get(hash)
defer r.Close()
@@ -509,7 +514,7 @@ func (s *StandaloneSuite) TestGetFail(c *C) {
arv, err := arvadosclient.MakeArvadosClient()
kc, _ := MakeKeepClient(&arv)
arv.ApiToken = "abc123"
- kc.SetServiceRoots([]string{url})
+ kc.SetServiceRoots(map[string]string{"x":url})
r, n, url2, err := kc.Get(hash)
c.Check(err, Equals, BlockNotFound)
@@ -539,7 +544,7 @@ func (s *StandaloneSuite) TestChecksum(c *C) {
arv, err := arvadosclient.MakeArvadosClient()
kc, _ := MakeKeepClient(&arv)
arv.ApiToken = "abc123"
- kc.SetServiceRoots([]string{url})
+ kc.SetServiceRoots(map[string]string{"x":url})
r, n, _, err := kc.Get(barhash)
_, err = ioutil.ReadAll(r)
@@ -557,46 +562,55 @@ func (s *StandaloneSuite) TestChecksum(c *C) {
}
func (s *StandaloneSuite) TestGetWithFailures(c *C) {
-
- hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+ content := []byte("waz")
+ hash := fmt.Sprintf("%x", md5.Sum(content))
fh := FailHandler{
- make(chan string, 1)}
+ make(chan string, 4)}
st := StubGetHandler{
c,
hash,
"abc123",
- []byte("foo")}
+ content}
arv, err := arvadosclient.MakeArvadosClient()
kc, _ := MakeKeepClient(&arv)
arv.ApiToken = "abc123"
- service_roots := make([]string, 5)
+ service_roots := make(map[string]string)
ks1 := RunSomeFakeKeepServers(st, 1, 2990)
ks2 := RunSomeFakeKeepServers(fh, 4, 2991)
for i, k := range ks1 {
- service_roots[i] = k.url
+ service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
defer k.listener.Close()
}
for i, k := range ks2 {
- service_roots[len(ks1)+i] = k.url
+ service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
defer k.listener.Close()
}
kc.SetServiceRoots(service_roots)
+ // This test works only if one of the failing services is
+ // attempted before the succeeding service. Otherwise,
+ // <-fh.handled below will just hang! (Probe order depends on
+ // the choice of block content "waz" and the UUIDs of the fake
+ // servers, so we just tried different strings until we found
+ // an example that passes this Assert.)
+ c.Assert(NewRootSorter(service_roots, hash).GetSortedRoots()[0], Matches, ".*299[1-4]")
+
r, n, url2, err := kc.Get(hash)
+
<-fh.handled
c.Check(err, Equals, nil)
c.Check(n, Equals, int64(3))
c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks1[0].url, hash))
- content, err2 := ioutil.ReadAll(r)
+ read_content, err2 := ioutil.ReadAll(r)
c.Check(err2, Equals, nil)
- c.Check(content, DeepEquals, []byte("foo"))
+ c.Check(read_content, DeepEquals, content)
}
func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
@@ -659,12 +673,12 @@ func (s *StandaloneSuite) TestPutProxy(c *C) {
kc.Want_replicas = 2
kc.Using_proxy = true
arv.ApiToken = "abc123"
- service_roots := make([]string, 1)
+ service_roots := make(map[string]string)
ks1 := RunSomeFakeKeepServers(st, 1, 2990)
for i, k := range ks1 {
- service_roots[i] = k.url
+ service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
defer k.listener.Close()
}
@@ -690,12 +704,12 @@ func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) {
kc.Want_replicas = 3
kc.Using_proxy = true
arv.ApiToken = "abc123"
- service_roots := make([]string, 1)
+ service_roots := make(map[string]string)
ks1 := RunSomeFakeKeepServers(st, 1, 2990)
for i, k := range ks1 {
- service_roots[i] = k.url
+ service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
defer k.listener.Close()
}
kc.SetServiceRoots(service_roots)
diff --git a/sdk/go/keepclient/root_sorter.go b/sdk/go/keepclient/root_sorter.go
new file mode 100644
index 0000000..7a73d95
--- /dev/null
+++ b/sdk/go/keepclient/root_sorter.go
@@ -0,0 +1,61 @@
+package keepclient
+
+import (
+ "crypto/md5"
+ "fmt"
+ "sort"
+)
+
+type RootSorter struct {
+ root []string
+ weight []string
+ order []int
+}
+
+func NewRootSorter(serviceRoots map[string]string, hash string) (*RootSorter) {
+ rs := new(RootSorter)
+ rs.root = make([]string, len(serviceRoots))
+ rs.weight = make([]string, len(serviceRoots))
+ rs.order = make([]int, len(serviceRoots))
+ i := 0
+ for uuid, root := range serviceRoots {
+ rs.root[i] = root
+ rs.weight[i] = rs.getWeight(hash, uuid)
+ rs.order[i] = i
+ i++
+ }
+ sort.Sort(rs)
+ return rs
+}
+
+func (rs RootSorter) getWeight(hash string, uuid string) (string) {
+ var service_key []byte
+ if len(uuid) == 27 {
+ service_key = []byte(hash + uuid[12:])
+ } else {
+ // Only useful for testing, a set of one service root, etc.
+ service_key = []byte(hash + uuid)
+ }
+ return fmt.Sprintf("%x", md5.Sum(service_key))
+}
+
+func (rs RootSorter) GetSortedRoots() ([]string) {
+ sorted := make([]string, len(rs.order))
+ for i := range rs.order {
+ sorted[i] = rs.root[rs.order[i]]
+ }
+ return sorted
+}
+
+// Less is really More here: the heaviest root will be at the front of the list.
+func (rs RootSorter) Less(i, j int) bool {
+ return rs.weight[rs.order[j]] < rs.weight[rs.order[i]]
+}
+
+func (rs RootSorter) Len() int {
+ return len(rs.order)
+}
+
+func (rs RootSorter) Swap(i, j int) {
+ sort.IntSlice(rs.order).Swap(i, j)
+}
diff --git a/sdk/go/keepclient/support.go b/sdk/go/keepclient/support.go
index ce15ce9..e51dcb3 100644
--- a/sdk/go/keepclient/support.go
+++ b/sdk/go/keepclient/support.go
@@ -10,11 +10,11 @@ import (
"log"
"net/http"
"os"
- "strconv"
"strings"
)
type keepDisk struct {
+ Uuid string `json:"uuid"`
Hostname string `json:"service_host"`
Port int `json:"service_port"`
SSL bool `json:"service_ssl_flag"`
@@ -23,7 +23,8 @@ type keepDisk struct {
func (this *KeepClient) DiscoverKeepServers() error {
if prx := os.Getenv("ARVADOS_KEEP_PROXY"); prx != "" {
- this.SetServiceRoots([]string{prx})
+ sr := map[string]string{"proxy":prx}
+ this.SetServiceRoots(sr)
this.Using_proxy = true
return nil
}
@@ -42,7 +43,7 @@ func (this *KeepClient) DiscoverKeepServers() error {
}
listed := make(map[string]bool)
- service_roots := make([]string, 0, len(m.Items))
+ service_roots := make(map[string]string)
for _, element := range m.Items {
n := ""
@@ -57,7 +58,7 @@ func (this *KeepClient) DiscoverKeepServers() error {
// Skip duplicates
if !listed[url] {
listed[url] = true
- service_roots = append(service_roots, url)
+ service_roots[element.Uuid] = url
}
if element.SvcType == "proxy" {
this.Using_proxy = true
@@ -70,52 +71,7 @@ func (this *KeepClient) DiscoverKeepServers() error {
}
func (this KeepClient) shuffledServiceRoots(hash string) (pseq []string) {
- // Build an ordering with which to query the Keep servers based on the
- // contents of the hash. "hash" is a hex-encoded number at least 8
- // digits (32 bits) long
-
- // seed used to calculate the next keep server from 'pool' to be added
- // to 'pseq'
- seed := hash
-
- // Keep servers still to be added to the ordering
- service_roots := this.ServiceRoots()
- pool := make([]string, len(service_roots))
- copy(pool, service_roots)
-
- // output probe sequence
- pseq = make([]string, 0, len(service_roots))
-
- // iterate while there are servers left to be assigned
- for len(pool) > 0 {
-
- if len(seed) < 8 {
- // ran out of digits in the seed
- if len(pseq) < (len(hash) / 4) {
- // the number of servers added to the probe
- // sequence is less than the number of 4-digit
- // slices in 'hash' so refill the seed with the
- // last 4 digits.
- seed = hash[len(hash)-4:]
- }
- seed += hash
- }
-
- // Take the next 8 digits (32 bytes) and interpret as an integer,
- // then modulus with the size of the remaining pool to get the next
- // selected server.
- probe, _ := strconv.ParseUint(seed[0:8], 16, 32)
- probe %= uint64(len(pool))
-
- // Append the selected server to the probe sequence and remove it
- // from the pool.
- pseq = append(pseq, pool[probe])
- pool = append(pool[:probe], pool[probe+1:]...)
-
- // Remove the digits just used from the seed
- seed = seed[8:]
- }
- return pseq
+ return NewRootSorter(this.ServiceRoots(), hash).GetSortedRoots()
}
type uploadStatus struct {
@@ -189,7 +145,7 @@ func (this KeepClient) putReplicas(
expectedLength int64) (locator string, replicas int, err error) {
// Calculate the ordering for uploading to servers
- sv := this.shuffledServiceRoots(hash)
+ sv := NewRootSorter(this.ServiceRoots(), hash).GetSortedRoots()
// The next server to try contacting
next_server := 0
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list