[ARVADOS] created: e61de9add861db4c043341f3926acb95ded37862

git at public.curoverse.com git at public.curoverse.com
Tue Nov 11 16:31:04 EST 2014


        at  e61de9add861db4c043341f3926acb95ded37862 (commit)


commit e61de9add861db4c043341f3926acb95ded37862
Author: Tom Clegg <tom at curoverse.com>
Date:   Tue Nov 11 16:30:58 2014 -0500

    2853: Add comment.

diff --git a/sdk/go/keepclient/root_sorter_test.go b/sdk/go/keepclient/root_sorter_test.go
index 0d45e90..7fe6a70 100644
--- a/sdk/go/keepclient/root_sorter_test.go
+++ b/sdk/go/keepclient/root_sorter_test.go
@@ -43,6 +43,8 @@ func (*RootSorterSuite) JustOneRoot(c *C) {
 
 func (*RootSorterSuite) ReferenceSet(c *C) {
 	fakeroots := FakeServiceRoots(16)
+	// These reference probe orders are explained further in
+	// ../../python/arvados/keep.py:
 	expected_orders := []string{
 		"3eab2d5fc9681074",
 		"097dba52e648f1c3",

commit 1237511f9da5ee20588d44be4db5f9e37cfc6400
Author: Tom Clegg <tom at curoverse.com>
Date:   Tue Nov 11 16:20:12 2014 -0500

    2853: Use the same keep_service UUIDs so tests behave reliably.

diff --git a/sdk/go/keepclient/keepclient_test.go b/sdk/go/keepclient/keepclient_test.go
index 608e714..f02f982 100644
--- a/sdk/go/keepclient/keepclient_test.go
+++ b/sdk/go/keepclient/keepclient_test.go
@@ -617,12 +617,13 @@ func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
 	os.Setenv("ARVADOS_API_HOST", "localhost:3000")
 	os.Setenv("ARVADOS_API_TOKEN", "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
 	os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
+	content := []byte("TestPutGetHead")
 
 	arv, err := arvadosclient.MakeArvadosClient()
 	kc, err := MakeKeepClient(&arv)
 	c.Assert(err, Equals, nil)
 
-	hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+	hash := fmt.Sprintf("%x", md5.Sum(content))
 
 	{
 		n, _, err := kc.Ask(hash)
@@ -630,25 +631,25 @@ func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
 		c.Check(n, Equals, int64(0))
 	}
 	{
-		hash2, replicas, err := kc.PutB([]byte("foo"))
-		c.Check(hash2, Equals, fmt.Sprintf("%s+%v", hash, 3))
+		hash2, replicas, err := kc.PutB(content)
+		c.Check(hash2, Equals, fmt.Sprintf("%s+%d", hash, len(content)))
 		c.Check(replicas, Equals, 2)
 		c.Check(err, Equals, nil)
 	}
 	{
 		r, n, url2, err := kc.Get(hash)
 		c.Check(err, Equals, nil)
-		c.Check(n, Equals, int64(3))
+		c.Check(n, Equals, int64(len(content)))
 		c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash))
 
-		content, err2 := ioutil.ReadAll(r)
+		read_content, err2 := ioutil.ReadAll(r)
 		c.Check(err2, Equals, nil)
-		c.Check(content, DeepEquals, []byte("foo"))
+		c.Check(read_content, DeepEquals, content)
 	}
 	{
 		n, url2, err := kc.Ask(hash)
 		c.Check(err, Equals, nil)
-		c.Check(n, Equals, int64(3))
+		c.Check(n, Equals, int64(len(content)))
 		c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash))
 	}
 }
diff --git a/sdk/python/tests/run_test_server.py b/sdk/python/tests/run_test_server.py
index 1624402..739c754 100644
--- a/sdk/python/tests/run_test_server.py
+++ b/sdk/python/tests/run_test_server.py
@@ -178,8 +178,18 @@ def run_keep(blob_signing_key=None, enforce_permissions=False):
     for d in api.keep_disks().list().execute()['items']:
         api.keep_disks().delete(uuid=d['uuid']).execute()
 
-    s1 = api.keep_services().create(body={"keep_service": {"service_host": "localhost",  "service_port": 25107, "service_type": "disk"} }).execute()
-    s2 = api.keep_services().create(body={"keep_service": {"service_host": "localhost",  "service_port": 25108, "service_type": "disk"} }).execute()
+    s1 = api.keep_services().create(body={"keep_service": {
+                "uuid": "zzzzz-bi6l4-5bo5n1iekkjyz6b",
+                "service_host": "localhost",
+                "service_port": 25107,
+                "service_type": "disk"
+                }}).execute()
+    s2 = api.keep_services().create(body={"keep_service": {
+                "uuid": "zzzzz-bi6l4-2nz60e0ksj7vr3s",
+                "service_host": "localhost",
+                "service_port": 25108,
+                "service_type": "disk"
+                }}).execute()
     api.keep_disks().create(body={"keep_disk": {"keep_service_uuid": s1["uuid"] } }).execute()
     api.keep_disks().create(body={"keep_disk": {"keep_service_uuid": s2["uuid"] } }).execute()
 

commit fa3e996ee452bd9be853dd9e93aaec15623708f5
Author: Tom Clegg <tom at curoverse.com>
Date:   Tue Nov 11 15:02:07 2014 -0500

    2853: Update tests to survive ServiceRoots being a map[string]string.

diff --git a/services/keepproxy/keepproxy_test.go b/services/keepproxy/keepproxy_test.go
index f6d163c..88ac8a6 100644
--- a/services/keepproxy/keepproxy_test.go
+++ b/services/keepproxy/keepproxy_test.go
@@ -143,11 +143,14 @@ func runProxy(c *C, args []string, token string, port int) keepclient.KeepClient
 	os.Setenv("ARVADOS_KEEP_PROXY", fmt.Sprintf("http://localhost:%v", port))
 	os.Setenv("ARVADOS_API_TOKEN", token)
 	arv, err := arvadosclient.MakeArvadosClient()
+	c.Assert(err, Equals, nil)
 	kc, err := keepclient.MakeKeepClient(&arv)
+	c.Assert(err, Equals, nil)
 	c.Check(kc.Using_proxy, Equals, true)
 	c.Check(len(kc.ServiceRoots()), Equals, 1)
-	c.Check(kc.ServiceRoots()[0], Equals, fmt.Sprintf("http://localhost:%v", port))
-	c.Check(err, Equals, nil)
+	for _, root := range(kc.ServiceRoots()) {
+		c.Check(root, Equals, fmt.Sprintf("http://localhost:%v", port))
+	}
 	os.Setenv("ARVADOS_KEEP_PROXY", "")
 	log.Print("keepclient created")
 	return kc
@@ -165,12 +168,15 @@ func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
 
 	os.Setenv("ARVADOS_EXTERNAL_CLIENT", "true")
 	arv, err := arvadosclient.MakeArvadosClient()
+	c.Assert(err, Equals, nil)
 	kc, err := keepclient.MakeKeepClient(&arv)
+	c.Assert(err, Equals, nil)
 	c.Check(kc.Arvados.External, Equals, true)
 	c.Check(kc.Using_proxy, Equals, true)
 	c.Check(len(kc.ServiceRoots()), Equals, 1)
-	c.Check(kc.ServiceRoots()[0], Equals, "http://localhost:29950")
-	c.Check(err, Equals, nil)
+	for _, root := range kc.ServiceRoots() {
+		c.Check(root, Equals, "http://localhost:29950")
+	}
 	os.Setenv("ARVADOS_EXTERNAL_CLIENT", "")
 	log.Print("keepclient created")
 

commit 1304e044aa87a65145bf8b6d4bc141586556c0ed
Author: Tom Clegg <tom at curoverse.com>
Date:   Tue Nov 11 14:31:49 2014 -0500

    2853: Add tests for reference set and some edge cases.

diff --git a/sdk/go/keepclient/root_sorter_test.go b/sdk/go/keepclient/root_sorter_test.go
new file mode 100644
index 0000000..0d45e90
--- /dev/null
+++ b/sdk/go/keepclient/root_sorter_test.go
@@ -0,0 +1,61 @@
+package keepclient
+
+import (
+	"crypto/md5"
+	"fmt"
+	. "gopkg.in/check.v1"
+	"strconv"
+	"strings"
+)
+
+type RootSorterSuite struct{}
+var _ = Suite(&RootSorterSuite{})
+
+func FakeSvcRoot(i uint64) (string) {
+	return fmt.Sprintf("https://%x.svc/", i)
+}
+
+func FakeSvcUuid(i uint64) (string) {
+	return fmt.Sprintf("zzzzz-bi6l4-%015x", i)
+}
+
+func FakeServiceRoots(n uint64) (map[string]string) {
+	sr := map[string]string{}
+	for i := uint64(0); i < n; i ++ {
+		sr[FakeSvcUuid(i)] = FakeSvcRoot(i)
+	}
+	return sr
+}
+
+func Md5String(data string) (string) {
+	return fmt.Sprintf("%032x", md5.Sum([]byte(data)))
+}
+
+func (*RootSorterSuite) EmptyRoots(c *C) {
+	rs := NewRootSorter(map[string]string{}, Md5String("foo"))
+	c.Check(rs.GetSortedRoots(), Equals, []string{})
+}
+
+func (*RootSorterSuite) JustOneRoot(c *C) {
+	rs := NewRootSorter(FakeServiceRoots(1), Md5String("foo"))
+	c.Check(rs.GetSortedRoots(), Equals, []string{FakeSvcRoot(0)})
+}
+
+func (*RootSorterSuite) ReferenceSet(c *C) {
+	fakeroots := FakeServiceRoots(16)
+	expected_orders := []string{
+		"3eab2d5fc9681074",
+		"097dba52e648f1c3",
+		"c5b4e023f8a7d691",
+		"9d81c02e76a3bf54",
+	}
+	for h, expected_order := range expected_orders {
+		hash := Md5String(fmt.Sprintf("%064x", h))
+		roots := NewRootSorter(fakeroots, hash).GetSortedRoots()
+		for i, svc_id_s := range strings.Split(expected_order, "") {
+			svc_id, err := strconv.ParseUint(svc_id_s, 16, 64)
+			c.Assert(err, Equals, nil)
+			c.Check(roots[i], Equals, FakeSvcRoot(svc_id))
+		}
+	}
+}

commit 47b0cb35b5ee933757c4342bb75fd286c1dac8cb
Author: Tom Clegg <tom at curoverse.com>
Date:   Tue Nov 11 13:57:15 2014 -0500

    2853: Add "reference set" test to check probe order agreement between implementations.

diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index 2cb0317..72bc741 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -1,6 +1,7 @@
 import hashlib
 import mock
 import os
+import re
 import socket
 import unittest
 import urlparse
@@ -21,11 +22,35 @@ class KeepRendezvousWeightTestCase(unittest.TestCase):
     def addServices(self, n):
         for x in range(n):
             uuid = "zzzzz-bi6l4-{:015x}".format(self.n_services)
-            uri = "https://[0.0.0.{}]:25107/".format(self.n_services)
+            uri = "https://keep0x{:x}.zzzzz.arvadosapi.com:25107/".format(self.n_services)
             self.keep_client._keep_services.append(
                 {'uuid': uuid, '_service_root': uri})
             self.n_services += 1
 
+    def test_ProbeOrderReferenceSet(self):
+        # expected_order[i] is the probe order for
+        # hash=md5(sprintf("%064x",i)) where there are 16 services
+        # with uuid sprintf("anything-%015x",j) with j in 0..15. E.g.,
+        # the first probe for the block consisting of 64 "0"
+        # characters is the service whose uuid is
+        # "zzzzz-bi6l4-000000000000003", so expected_order[0][0]=='3'.
+        expected_order = [
+            list('3eab2d5fc9681074'),
+            list('097dba52e648f1c3'),
+            list('c5b4e023f8a7d691'),
+            list('9d81c02e76a3bf54'),
+            ]
+        hashes = [
+            hashlib.md5("{:064x}".format(x)).hexdigest()
+            for x in range(len(expected_order))]
+        self.addServices(16)
+        for i, hash in enumerate(hashes):
+            roots = self.keep_client.weighted_service_roots(hash)
+            got_order = [
+                re.search(r'//keep0x([0-9a-f]+)', root).group(1)
+                for root in roots]
+            self.assertEqual(got_order, expected_order[i])
+
     def test_ProbeWasteAddingOneServer(self):
         hashes = [
             hashlib.md5("{:064x}".format(x)).hexdigest() for x in range(100)]

commit e02ef893c4f6cf881e449c248782b2ac21b49b0f
Author: Tom Clegg <tom at curoverse.com>
Date:   Tue Nov 11 12:44:02 2014 -0500

    2853: Fix instance ID in test fixture hostnames. Clean up brittle tests.

diff --git a/services/api/test/fixtures/keep_services.yml b/services/api/test/fixtures/keep_services.yml
index 84ac316..f668cbc 100644
--- a/services/api/test/fixtures/keep_services.yml
+++ b/services/api/test/fixtures/keep_services.yml
@@ -1,7 +1,7 @@
 keep0:
   uuid: zzzzz-bi6l4-6zhilxar6r8ey90
   owner_uuid: zzzzz-tpzed-d9tiejq69daie8f
-  service_host: keep0.qr1hi.arvadosapi.com
+  service_host: keep0.zzzzz.arvadosapi.com
   service_port: 25107
   service_ssl_flag: false
   service_type: disk
@@ -9,7 +9,7 @@ keep0:
 keep1:
   uuid: zzzzz-bi6l4-rsnj3c76ndxb7o0
   owner_uuid: zzzzz-tpzed-d9tiejq69daie8f
-  service_host: keep1.qr1hi.arvadosapi.com
+  service_host: keep1.zzzzz.arvadosapi.com
   service_port: 25107
   service_ssl_flag: false
   service_type: disk
@@ -17,7 +17,7 @@ keep1:
 proxy:
   uuid: zzzzz-bi6l4-h0a0xwut9qa6g3a
   owner_uuid: zzzzz-tpzed-d9tiejq69daie8f
-  service_host: keep.qr1hi.arvadosapi.com
+  service_host: keep.zzzzz.arvadosapi.com
   service_port: 25333
   service_ssl_flag: true
   service_type: proxy
diff --git a/services/api/test/integration/keep_proxy_test.rb b/services/api/test/integration/keep_proxy_test.rb
index d4155c2..aacda51 100644
--- a/services/api/test/integration/keep_proxy_test.rb
+++ b/services/api/test/integration/keep_proxy_test.rb
@@ -6,20 +6,23 @@ class KeepProxyTest < ActionDispatch::IntegrationTest
     assert_response :success
     services = json_response['items']
 
-    assert_equal 2, services.length
-    assert_equal 'disk', services[0]['service_type']
-    assert_equal 'disk', services[1]['service_type']
+    assert_operator 2, :<=, services.length
+    services.each do |service|
+      assert_equal 'disk', service['service_type']
+    end
+  end
 
+  test "request keep proxy" do
     get "/arvados/v1/keep_services/accessible", {:format => :json}, auth(:active).merge({'HTTP_X_EXTERNAL_CLIENT' => '1'})
     assert_response :success
     services = json_response['items']
 
     assert_equal 1, services.length
 
-    assert_equal "zzzzz-bi6l4-h0a0xwut9qa6g3a", services[0]['uuid']
-    assert_equal "keep.qr1hi.arvadosapi.com", services[0]['service_host']
-    assert_equal 25333, services[0]['service_port']
-    assert_equal true, services[0]['service_ssl_flag']
+    assert_equal keep_services(:proxy).uuid, services[0]['uuid']
+    assert_equal keep_services(:proxy).service_host, services[0]['service_host']
+    assert_equal keep_services(:proxy).service_port, services[0]['service_port']
+    assert_equal keep_services(:proxy).service_ssl_flag, services[0]['service_ssl_flag']
     assert_equal 'proxy', services[0]['service_type']
   end
 end

commit 9405b5f224b205e681e378bd2d90d18f3638139b
Author: Tom Clegg <tom at curoverse.com>
Date:   Tue Nov 11 12:11:08 2014 -0500

    2853: Use rendezvous hashing to select probe order in Python library.

diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 46bd1cb..4c288f9 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -22,15 +22,15 @@ import datetime
 import ssl
 import socket
 
-_logger = logging.getLogger('arvados.keep')
-global_client_object = None
-
 import arvados
 import arvados.config as config
 import arvados.errors
 import arvados.retry as retry
 import arvados.util
 
+_logger = logging.getLogger('arvados.keep')
+global_client_object = None
+
 class KeepLocator(object):
     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9 at _-]+$')
@@ -289,7 +289,7 @@ class KeepClient(object):
         def last_status(self):
             try:
                 return int(self.last_result[0].status)
-            except (AttributeError, IndexError, ValueError):
+            except (AttributeError, IndexError, ValueError, TypeError):
                 return None
 
         def get(self, http, locator):
@@ -448,9 +448,12 @@ class KeepClient(object):
                 if not proxy.endswith('/'):
                     proxy += '/'
                 self.api_token = api_token
-                self.service_roots = [proxy]
+                self._keep_services = [{
+                    'uuid': 'proxy',
+                    '_service_root': proxy,
+                    }]
                 self.using_proxy = True
-                self.static_service_roots = True
+                self._static_services_list = True
             else:
                 # It's important to avoid instantiating an API client
                 # unless we actually need one, for testing's sake.
@@ -458,13 +461,13 @@ class KeepClient(object):
                     api_client = arvados.api('v1')
                 self.api_client = api_client
                 self.api_token = api_client.api_token
-                self.service_roots = None
+                self._keep_services = None
                 self.using_proxy = None
-                self.static_service_roots = False
+                self._static_services_list = False
 
-    def build_service_roots(self, force_rebuild=False):
-        if (self.static_service_roots or
-              (self.service_roots and not force_rebuild)):
+    def build_services_list(self, force_rebuild=False):
+        if (self._static_services_list or
+              (self._keep_services and not force_rebuild)):
             return
         with self.lock:
             try:
@@ -472,68 +475,47 @@ class KeepClient(object):
             except Exception:  # API server predates Keep services.
                 keep_services = self.api_client.keep_disks().list()
 
-            keep_services = keep_services.execute().get('items')
-            if not keep_services:
+            self._keep_services = keep_services.execute().get('items')
+            if not self._keep_services:
                 raise arvados.errors.NoKeepServersError()
 
             self.using_proxy = any(ks.get('service_type') == 'proxy'
-                                   for ks in keep_services)
-
-            roots = ("{}://[{}]:{:d}/".format(
-                        'https' if ks['service_ssl_flag'] else 'http',
-                         ks['service_host'],
-                         ks['service_port'])
-                     for ks in keep_services)
-            self.service_roots = sorted(set(roots))
-            _logger.debug(str(self.service_roots))
-
-    def shuffled_service_roots(self, hash, force_rebuild=False):
-        self.build_service_roots(force_rebuild)
-
-        # Build an ordering with which to query the Keep servers based on the
-        # contents of the hash.
-        # "hash" is a hex-encoded number at least 8 digits
-        # (32 bits) long
-
-        # seed used to calculate the next keep server from 'pool'
-        # to be added to 'pseq'
-        seed = hash
-
-        # Keep servers still to be added to the ordering
-        pool = self.service_roots[:]
-
-        # output probe sequence
-        pseq = []
-
-        # iterate while there are servers left to be assigned
-        while len(pool) > 0:
-            if len(seed) < 8:
-                # ran out of digits in the seed
-                if len(pseq) < len(hash) / 4:
-                    # the number of servers added to the probe sequence is less
-                    # than the number of 4-digit slices in 'hash' so refill the
-                    # seed with the last 4 digits and then append the contents
-                    # of 'hash'.
-                    seed = hash[-4:] + hash
-                else:
-                    # refill the seed with the contents of 'hash'
-                    seed += hash
-
-            # Take the next 8 digits (32 bytes) and interpret as an integer,
-            # then modulus with the size of the remaining pool to get the next
-            # selected server.
-            probe = int(seed[0:8], 16) % len(pool)
-
-            # Append the selected server to the probe sequence and remove it
-            # from the pool.
-            pseq += [pool[probe]]
-            pool = pool[:probe] + pool[probe+1:]
-
-            # Remove the digits just used from the seed
-            seed = seed[8:]
-        _logger.debug(str(pseq))
-        return pseq
+                                   for ks in self._keep_services)
+
+            # Precompute the base URI for each service.
+            for r in self._keep_services:
+                r['_service_root'] = "{}://[{}]:{:d}/".format(
+                    'https' if r['service_ssl_flag'] else 'http',
+                    r['service_host'],
+                    r['service_port'])
+            _logger.debug(str(self._keep_services))
+
+    def _service_weight(self, hash, service_uuid):
+        """Compute the weight of a Keep service endpoint for a data
+        block with a known hash.
+
+        The weight is md5(h + u) where u is the last 15 characters of
+        the service endpoint's UUID.
+        """
+        return hashlib.md5(hash + service_uuid[-15:]).hexdigest()
 
+    def weighted_service_roots(self, hash, force_rebuild=False):
+        """Return an array of Keep service endpoints, in the order in
+        which they should be probed when reading or writing data with
+        the given hash.
+        """
+        self.build_services_list(force_rebuild)
+
+        # Sort the available services by weight (heaviest first) for
+        # this hash, and return their service_roots (base URIs) in
+        # that order.
+        sorted_roots = [
+            svc['_service_root'] for svc in sorted(
+                self._keep_services,
+                reverse=True,
+                key=lambda svc: self._service_weight(hash, svc['uuid']))]
+        _logger.debug(hash + ': ' + str(sorted_roots))
+        return sorted_roots
 
     def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
         # roots_map is a dictionary, mapping Keep service root strings
@@ -541,7 +523,7 @@ class KeepClient(object):
         # new ones to roots_map.  Return the current list of local
         # root strings.
         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
-        local_roots = self.shuffled_service_roots(md5_s, force_rebuild)
+        local_roots = self.weighted_service_roots(md5_s, force_rebuild)
         for root in local_roots:
             if root not in roots_map:
                 roots_map[root] = self.KeepService(root, **headers)
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index d07d6e1..2cb0317 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -1,3 +1,4 @@
+import hashlib
 import mock
 import os
 import socket
@@ -9,6 +10,60 @@ import arvados.retry
 import arvados_testutil as tutil
 import run_test_server
 
+class KeepRendezvousWeightTestCase(unittest.TestCase):
+    def setUp(self):
+        self.keep_client = arvados.KeepClient(
+            api_client=mock.MagicMock(name='api_client'),
+            proxy='', local_store='')
+        self.keep_client._keep_services = []
+        self.n_services = 0
+
+    def addServices(self, n):
+        for x in range(n):
+            uuid = "zzzzz-bi6l4-{:015x}".format(self.n_services)
+            uri = "https://[0.0.0.{}]:25107/".format(self.n_services)
+            self.keep_client._keep_services.append(
+                {'uuid': uuid, '_service_root': uri})
+            self.n_services += 1
+
+    def test_ProbeWasteAddingOneServer(self):
+        hashes = [
+            hashlib.md5("{:064x}".format(x)).hexdigest() for x in range(100)]
+        initial_services = 12
+        self.addServices(initial_services)
+        probes_before = [
+            self.keep_client.weighted_service_roots(hash) for hash in hashes]
+        for added_services in range(1, 12):
+            self.addServices(1)
+            total_penalty = 0
+            for hash_index in range(len(hashes)):
+                probe_after = self.keep_client.weighted_service_roots(
+                    hashes[hash_index])
+                penalty = probe_after.index(probes_before[hash_index][0])
+                self.assertLessEqual(penalty, added_services)
+                total_penalty += penalty
+            # Average penalty per block should not exceed
+            # N(added)/N(orig) by more than 20%, and should get closer
+            # to the ideal as we add data points.
+            expect_penalty = (
+                added_services *
+                len(hashes) / initial_services)
+            max_penalty = (
+                expect_penalty *
+                (120 - added_services)/100)
+            min_penalty = (
+                expect_penalty * 8/10)
+            self.assertTrue(
+                min_penalty <= total_penalty <= max_penalty,
+                "With {}+{} services, {} blocks, penalty {} but expected {}..{}".format(
+                    initial_services,
+                    added_services,
+                    len(hashes),
+                    total_penalty,
+                    min_penalty,
+                    max_penalty))
+
+
 class KeepTestCase(run_test_server.TestCaseWithServers):
     MAIN_SERVER = {}
     KEEP_SERVER = {}
@@ -249,7 +304,7 @@ class KeepClientServiceTestCase(unittest.TestCase):
     def get_service_roots(self, *services):
         api_client = self.mock_keep_services(*services)
         keep_client = arvados.KeepClient(api_client=api_client)
-        services = keep_client.shuffled_service_roots('000000')
+        services = keep_client.weighted_service_roots('000000')
         return [urlparse.urlparse(url) for url in sorted(services)]
 
     def test_ssl_flag_respected_in_roots(self):

commit a4b55a80251d666ca39d0f8d201c70e493b4c661
Author: Tom Clegg <tom at curoverse.com>
Date:   Tue Nov 11 05:14:21 2014 -0500

    2853: Use rendezvous hashing to select probe order.

diff --git a/sdk/go/keepclient/keepclient.go b/sdk/go/keepclient/keepclient.go
index e1c25c9..326c2a0 100644
--- a/sdk/go/keepclient/keepclient.go
+++ b/sdk/go/keepclient/keepclient.go
@@ -12,7 +12,6 @@ import (
 	"log"
 	"net/http"
 	"regexp"
-	"sort"
 	"strings"
 	"sync"
 	"sync/atomic"
@@ -36,7 +35,7 @@ type KeepClient struct {
 	Arvados       *arvadosclient.ArvadosClient
 	Want_replicas int
 	Using_proxy   bool
-	service_roots *[]string
+	service_roots *map[string]string
 	lock          sync.Mutex
 	Client        *http.Client
 }
@@ -133,7 +132,7 @@ func (this KeepClient) AuthorizedGet(hash string,
 	contentLength int64, url string, err error) {
 
 	// Calculate the ordering for asking servers
-	sv := this.shuffledServiceRoots(hash)
+	sv := NewRootSorter(this.ServiceRoots(), hash).GetSortedRoots()
 
 	for _, host := range sv {
 		var req *http.Request
@@ -175,7 +174,7 @@ func (this KeepClient) Ask(hash string) (contentLength int64, url string, err er
 func (this KeepClient) AuthorizedAsk(hash string, signature string,
 	timestamp string) (contentLength int64, url string, err error) {
 	// Calculate the ordering for asking servers
-	sv := this.shuffledServiceRoots(hash)
+	sv := NewRootSorter(this.ServiceRoots(), hash).GetSortedRoots()
 
 	for _, host := range sv {
 		var req *http.Request
@@ -208,20 +207,19 @@ func (this KeepClient) AuthorizedAsk(hash string, signature string,
 }
 
 // Atomically read the service_roots field.
-func (this *KeepClient) ServiceRoots() []string {
-	r := (*[]string)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots))))
+func (this *KeepClient) ServiceRoots() map[string]string {
+	r := (*map[string]string)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots))))
 	return *r
 }
 
 // Atomically update the service_roots field.  Enables you to update
 // service_roots without disrupting any GET or PUT operations that might
 // already be in progress.
-func (this *KeepClient) SetServiceRoots(svc []string) {
-	// Must be sorted for ShuffledServiceRoots() to produce consistent
-	// results.
-	roots := make([]string, len(svc))
-	copy(roots, svc)
-	sort.Strings(roots)
+func (this *KeepClient) SetServiceRoots(new_roots map[string]string) {
+	roots := make(map[string]string)
+	for uuid, root := range new_roots {
+		roots[uuid] = root
+	}
 	atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots)),
 		unsafe.Pointer(&roots))
 }
diff --git a/sdk/go/keepclient/keepclient_test.go b/sdk/go/keepclient/keepclient_test.go
index 04be03b..608e714 100644
--- a/sdk/go/keepclient/keepclient_test.go
+++ b/sdk/go/keepclient/keepclient_test.go
@@ -87,20 +87,27 @@ func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
 
 	c.Assert(err, Equals, nil)
 	c.Check(len(kc.ServiceRoots()), Equals, 2)
-	c.Check(kc.ServiceRoots()[0], Equals, "http://localhost:25107")
-	c.Check(kc.ServiceRoots()[1], Equals, "http://localhost:25108")
+	for _, root := range kc.ServiceRoots() {
+		c.Check(root, Matches, "http://localhost:2510[\\d]")
+	}
 }
 
 func (s *StandaloneSuite) TestShuffleServiceRoots(c *C) {
+	roots := map[string]string{
+		"zzzzz-bi6l4-2q7dq8becevdqfb": "http://localhost:1",
+		"zzzzz-bi6l4-4gbhck2w7lq0d96": "http://localhost:2",
+		"zzzzz-bi6l4-4bt69dsk0quh7ae": "http://localhost:3",
+		"zzzzz-bi6l4-62w1fgd0ud2krxl": "http://localhost:4",
+	}
 	kc := KeepClient{}
-	kc.SetServiceRoots([]string{"http://localhost:25107", "http://localhost:25108", "http://localhost:25109", "http://localhost:25110", "http://localhost:25111", "http://localhost:25112", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25116", "http://localhost:25117", "http://localhost:25118", "http://localhost:25119", "http://localhost:25120", "http://localhost:25121", "http://localhost:25122", "http://localhost:25123"})
+	kc.SetServiceRoots(roots)
 
 	// "foo" acbd18db4cc2f85cedef654fccc4a4d8
-	foo_shuffle := []string{"http://localhost:25116", "http://localhost:25120", "http://localhost:25119", "http://localhost:25122", "http://localhost:25108", "http://localhost:25114", "http://localhost:25112", "http://localhost:25107", "http://localhost:25118", "http://localhost:25111", "http://localhost:25113", "http://localhost:25121", "http://localhost:25110", "http://localhost:25117", "http://localhost:25109", "http://localhost:25115", "http://localhost:25123"}
+	foo_shuffle := []string{"http://localhost:4", "http://localhost:1", "http://localhost:3", "http://localhost:2"}
 	c.Check(kc.shuffledServiceRoots("acbd18db4cc2f85cedef654fccc4a4d8"), DeepEquals, foo_shuffle)
 
 	// "bar" 37b51d194a7513e45b56f6524f2d51f2
-	bar_shuffle := []string{"http://localhost:25108", "http://localhost:25112", "http://localhost:25119", "http://localhost:25107", "http://localhost:25110", "http://localhost:25116", "http://localhost:25122", "http://localhost:25120", "http://localhost:25121", "http://localhost:25117", "http://localhost:25111", "http://localhost:25123", "http://localhost:25118", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25109"}
+	bar_shuffle := []string{"http://localhost:3", "http://localhost:2", "http://localhost:4", "http://localhost:1"}
 	c.Check(kc.shuffledServiceRoots("37b51d194a7513e45b56f6524f2d51f2"), DeepEquals, bar_shuffle)
 }
 
@@ -273,19 +280,19 @@ func (s *StandaloneSuite) TestPutB(c *C) {
 		hash,
 		"abc123",
 		"foo",
-		make(chan string, 2)}
+		make(chan string, 5)}
 
 	arv, _ := arvadosclient.MakeArvadosClient()
 	kc, _ := MakeKeepClient(&arv)
 
 	kc.Want_replicas = 2
 	arv.ApiToken = "abc123"
-	service_roots := make([]string, 5)
+	service_roots := make(map[string]string)
 
 	ks := RunSomeFakeKeepServers(st, 5, 2990)
 
 	for i := 0; i < len(ks); i += 1 {
-		service_roots[i] = ks[i].url
+		service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = ks[i].url
 		defer ks[i].listener.Close()
 	}
 
@@ -315,19 +322,19 @@ func (s *StandaloneSuite) TestPutHR(c *C) {
 		hash,
 		"abc123",
 		"foo",
-		make(chan string, 2)}
+		make(chan string, 5)}
 
 	arv, _ := arvadosclient.MakeArvadosClient()
 	kc, _ := MakeKeepClient(&arv)
 
 	kc.Want_replicas = 2
 	arv.ApiToken = "abc123"
-	service_roots := make([]string, 5)
+	service_roots := make(map[string]string)
 
 	ks := RunSomeFakeKeepServers(st, 5, 2990)
 
 	for i := 0; i < len(ks); i += 1 {
-		service_roots[i] = ks[i].url
+		service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = ks[i].url
 		defer ks[i].listener.Close()
 	}
 
@@ -366,7 +373,7 @@ func (s *StandaloneSuite) TestPutWithFail(c *C) {
 		hash,
 		"abc123",
 		"foo",
-		make(chan string, 2)}
+		make(chan string, 4)}
 
 	fh := FailHandler{
 		make(chan string, 1)}
@@ -376,17 +383,17 @@ func (s *StandaloneSuite) TestPutWithFail(c *C) {
 
 	kc.Want_replicas = 2
 	arv.ApiToken = "abc123"
-	service_roots := make([]string, 5)
+	service_roots := make(map[string]string)
 
 	ks1 := RunSomeFakeKeepServers(st, 4, 2990)
 	ks2 := RunSomeFakeKeepServers(fh, 1, 2995)
 
 	for i, k := range ks1 {
-		service_roots[i] = k.url
+		service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
 		defer k.listener.Close()
 	}
 	for i, k := range ks2 {
-		service_roots[len(ks1)+i] = k.url
+		service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
 		defer k.listener.Close()
 	}
 
@@ -425,29 +432,27 @@ func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
 
 	kc.Want_replicas = 2
 	arv.ApiToken = "abc123"
-	service_roots := make([]string, 5)
+	service_roots := make(map[string]string)
 
 	ks1 := RunSomeFakeKeepServers(st, 1, 2990)
 	ks2 := RunSomeFakeKeepServers(fh, 4, 2991)
 
 	for i, k := range ks1 {
-		service_roots[i] = k.url
+		service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
 		defer k.listener.Close()
 	}
 	for i, k := range ks2 {
-		service_roots[len(ks1)+i] = k.url
+		service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
 		defer k.listener.Close()
 	}
 
 	kc.SetServiceRoots(service_roots)
 
-	shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
-
 	_, replicas, err := kc.PutB([]byte("foo"))
 
 	c.Check(err, Equals, InsufficientReplicasError)
 	c.Check(replicas, Equals, 1)
-	c.Check(<-st.handled, Equals, shuff[1])
+	c.Check(<-st.handled, Matches, ".*2990")
 
 	log.Printf("TestPutWithTooManyFail done")
 }
@@ -483,7 +488,7 @@ func (s *StandaloneSuite) TestGet(c *C) {
 	arv, err := arvadosclient.MakeArvadosClient()
 	kc, _ := MakeKeepClient(&arv)
 	arv.ApiToken = "abc123"
-	kc.SetServiceRoots([]string{url})
+	kc.SetServiceRoots(map[string]string{"x":url})
 
 	r, n, url2, err := kc.Get(hash)
 	defer r.Close()
@@ -509,7 +514,7 @@ func (s *StandaloneSuite) TestGetFail(c *C) {
 	arv, err := arvadosclient.MakeArvadosClient()
 	kc, _ := MakeKeepClient(&arv)
 	arv.ApiToken = "abc123"
-	kc.SetServiceRoots([]string{url})
+	kc.SetServiceRoots(map[string]string{"x":url})
 
 	r, n, url2, err := kc.Get(hash)
 	c.Check(err, Equals, BlockNotFound)
@@ -539,7 +544,7 @@ func (s *StandaloneSuite) TestChecksum(c *C) {
 	arv, err := arvadosclient.MakeArvadosClient()
 	kc, _ := MakeKeepClient(&arv)
 	arv.ApiToken = "abc123"
-	kc.SetServiceRoots([]string{url})
+	kc.SetServiceRoots(map[string]string{"x":url})
 
 	r, n, _, err := kc.Get(barhash)
 	_, err = ioutil.ReadAll(r)
@@ -557,46 +562,55 @@ func (s *StandaloneSuite) TestChecksum(c *C) {
 }
 
 func (s *StandaloneSuite) TestGetWithFailures(c *C) {
-
-	hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+	content := []byte("waz")
+	hash := fmt.Sprintf("%x", md5.Sum(content))
 
 	fh := FailHandler{
-		make(chan string, 1)}
+		make(chan string, 4)}
 
 	st := StubGetHandler{
 		c,
 		hash,
 		"abc123",
-		[]byte("foo")}
+		content}
 
 	arv, err := arvadosclient.MakeArvadosClient()
 	kc, _ := MakeKeepClient(&arv)
 	arv.ApiToken = "abc123"
-	service_roots := make([]string, 5)
+	service_roots := make(map[string]string)
 
 	ks1 := RunSomeFakeKeepServers(st, 1, 2990)
 	ks2 := RunSomeFakeKeepServers(fh, 4, 2991)
 
 	for i, k := range ks1 {
-		service_roots[i] = k.url
+		service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
 		defer k.listener.Close()
 	}
 	for i, k := range ks2 {
-		service_roots[len(ks1)+i] = k.url
+		service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
 		defer k.listener.Close()
 	}
 
 	kc.SetServiceRoots(service_roots)
 
+	// This test works only if one of the failing services is
+	// attempted before the succeeding service. Otherwise,
+	// <-fh.handled below will just hang! (Probe order depends on
+	// the choice of block content "waz" and the UUIDs of the fake
+	// servers, so we just tried different strings until we found
+	// an example that passes this Assert.)
+	c.Assert(NewRootSorter(service_roots, hash).GetSortedRoots()[0], Matches, ".*299[1-4]")
+
 	r, n, url2, err := kc.Get(hash)
+
 	<-fh.handled
 	c.Check(err, Equals, nil)
 	c.Check(n, Equals, int64(3))
 	c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks1[0].url, hash))
 
-	content, err2 := ioutil.ReadAll(r)
+	read_content, err2 := ioutil.ReadAll(r)
 	c.Check(err2, Equals, nil)
-	c.Check(content, DeepEquals, []byte("foo"))
+	c.Check(read_content, DeepEquals, content)
 }
 
 func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
@@ -659,12 +673,12 @@ func (s *StandaloneSuite) TestPutProxy(c *C) {
 	kc.Want_replicas = 2
 	kc.Using_proxy = true
 	arv.ApiToken = "abc123"
-	service_roots := make([]string, 1)
+	service_roots := make(map[string]string)
 
 	ks1 := RunSomeFakeKeepServers(st, 1, 2990)
 
 	for i, k := range ks1 {
-		service_roots[i] = k.url
+		service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
 		defer k.listener.Close()
 	}
 
@@ -690,12 +704,12 @@ func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) {
 	kc.Want_replicas = 3
 	kc.Using_proxy = true
 	arv.ApiToken = "abc123"
-	service_roots := make([]string, 1)
+	service_roots := make(map[string]string)
 
 	ks1 := RunSomeFakeKeepServers(st, 1, 2990)
 
 	for i, k := range ks1 {
-		service_roots[i] = k.url
+		service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
 		defer k.listener.Close()
 	}
 	kc.SetServiceRoots(service_roots)
diff --git a/sdk/go/keepclient/root_sorter.go b/sdk/go/keepclient/root_sorter.go
new file mode 100644
index 0000000..7a73d95
--- /dev/null
+++ b/sdk/go/keepclient/root_sorter.go
@@ -0,0 +1,61 @@
+package keepclient
+
+import (
+	"crypto/md5"
+	"fmt"
+	"sort"
+)
+
+type RootSorter struct {
+	root         []string
+	weight       []string
+	order        []int
+}
+
+func NewRootSorter(serviceRoots map[string]string, hash string) (*RootSorter) {
+	rs := new(RootSorter)
+	rs.root = make([]string, len(serviceRoots))
+	rs.weight = make([]string, len(serviceRoots))
+	rs.order = make([]int, len(serviceRoots))
+	i := 0
+	for uuid, root := range serviceRoots {
+		rs.root[i] = root
+		rs.weight[i] = rs.getWeight(hash, uuid)
+		rs.order[i] = i
+		i++
+	}
+	sort.Sort(rs)
+	return rs
+}
+
+func (rs RootSorter) getWeight(hash string, uuid string) (string) {
+	var service_key []byte
+	if len(uuid) == 27 {
+		service_key = []byte(hash + uuid[12:])
+	} else {
+		// Only useful for testing, a set of one service root, etc.
+		service_key = []byte(hash + uuid)
+	}
+	return fmt.Sprintf("%x", md5.Sum(service_key))
+}
+
+func (rs RootSorter) GetSortedRoots() ([]string) {
+	sorted := make([]string, len(rs.order))
+	for i := range rs.order {
+		sorted[i] = rs.root[rs.order[i]]
+	}
+	return sorted
+}
+
+// Less is really More here: the heaviest root will be at the front of the list.
+func (rs RootSorter) Less(i, j int) bool {
+	return rs.weight[rs.order[j]] < rs.weight[rs.order[i]]
+}
+
+func (rs RootSorter) Len() int {
+	return len(rs.order)
+}
+
+func (rs RootSorter) Swap(i, j int) {
+	sort.IntSlice(rs.order).Swap(i, j)
+}
diff --git a/sdk/go/keepclient/support.go b/sdk/go/keepclient/support.go
index ce15ce9..e51dcb3 100644
--- a/sdk/go/keepclient/support.go
+++ b/sdk/go/keepclient/support.go
@@ -10,11 +10,11 @@ import (
 	"log"
 	"net/http"
 	"os"
-	"strconv"
 	"strings"
 )
 
 type keepDisk struct {
+	Uuid     string `json:"uuid"`
 	Hostname string `json:"service_host"`
 	Port     int    `json:"service_port"`
 	SSL      bool   `json:"service_ssl_flag"`
@@ -23,7 +23,8 @@ type keepDisk struct {
 
 func (this *KeepClient) DiscoverKeepServers() error {
 	if prx := os.Getenv("ARVADOS_KEEP_PROXY"); prx != "" {
-		this.SetServiceRoots([]string{prx})
+		sr := map[string]string{"proxy":prx}
+		this.SetServiceRoots(sr)
 		this.Using_proxy = true
 		return nil
 	}
@@ -42,7 +43,7 @@ func (this *KeepClient) DiscoverKeepServers() error {
 	}
 
 	listed := make(map[string]bool)
-	service_roots := make([]string, 0, len(m.Items))
+	service_roots := make(map[string]string)
 
 	for _, element := range m.Items {
 		n := ""
@@ -57,7 +58,7 @@ func (this *KeepClient) DiscoverKeepServers() error {
 		// Skip duplicates
 		if !listed[url] {
 			listed[url] = true
-			service_roots = append(service_roots, url)
+			service_roots[element.Uuid] = url
 		}
 		if element.SvcType == "proxy" {
 			this.Using_proxy = true
@@ -70,52 +71,7 @@ func (this *KeepClient) DiscoverKeepServers() error {
 }
 
 func (this KeepClient) shuffledServiceRoots(hash string) (pseq []string) {
-	// Build an ordering with which to query the Keep servers based on the
-	// contents of the hash.  "hash" is a hex-encoded number at least 8
-	// digits (32 bits) long
-
-	// seed used to calculate the next keep server from 'pool' to be added
-	// to 'pseq'
-	seed := hash
-
-	// Keep servers still to be added to the ordering
-	service_roots := this.ServiceRoots()
-	pool := make([]string, len(service_roots))
-	copy(pool, service_roots)
-
-	// output probe sequence
-	pseq = make([]string, 0, len(service_roots))
-
-	// iterate while there are servers left to be assigned
-	for len(pool) > 0 {
-
-		if len(seed) < 8 {
-			// ran out of digits in the seed
-			if len(pseq) < (len(hash) / 4) {
-				// the number of servers added to the probe
-				// sequence is less than the number of 4-digit
-				// slices in 'hash' so refill the seed with the
-				// last 4 digits.
-				seed = hash[len(hash)-4:]
-			}
-			seed += hash
-		}
-
-		// Take the next 8 digits (32 bytes) and interpret as an integer,
-		// then modulus with the size of the remaining pool to get the next
-		// selected server.
-		probe, _ := strconv.ParseUint(seed[0:8], 16, 32)
-		probe %= uint64(len(pool))
-
-		// Append the selected server to the probe sequence and remove it
-		// from the pool.
-		pseq = append(pseq, pool[probe])
-		pool = append(pool[:probe], pool[probe+1:]...)
-
-		// Remove the digits just used from the seed
-		seed = seed[8:]
-	}
-	return pseq
+	return NewRootSorter(this.ServiceRoots(), hash).GetSortedRoots()
 }
 
 type uploadStatus struct {
@@ -189,7 +145,7 @@ func (this KeepClient) putReplicas(
 	expectedLength int64) (locator string, replicas int, err error) {
 
 	// Calculate the ordering for uploading to servers
-	sv := this.shuffledServiceRoots(hash)
+	sv := NewRootSorter(this.ServiceRoots(), hash).GetSortedRoots()
 
 	// The next server to try contacting
 	next_server := 0

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list