[ARVADOS] updated: 12f8c88325daf4c6af8cbf091ea64cc5d64566c0

git at public.curoverse.com git at public.curoverse.com
Thu May 22 14:52:02 EDT 2014


Summary of changes:
 sdk/go/src/arvados.org/keepclient/keepclient.go    | 28 +++++++++-
 .../src/arvados.org/keepclient/keepclient_test.go  | 63 +++++++++++-----------
 sdk/go/src/arvados.org/keepclient/support.go       | 21 ++++----
 .../keep/src/arvados.org/keepproxy/keepproxy.go    | 34 ++++++++----
 .../src/arvados.org/keepproxy/keepproxy_test.go    |  8 +--
 5 files changed, 96 insertions(+), 58 deletions(-)

       via  12f8c88325daf4c6af8cbf091ea64cc5d64566c0 (commit)
      from  575457ac8645c61ca71e94ce074291ec002b4c24 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 12f8c88325daf4c6af8cbf091ea64cc5d64566c0
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Thu May 22 14:51:56 2014 -0400

    1885: Made ServiceRoots atomically updatable, so that KeepProxy can poll for
    changes in the services list without disrupting any active flows.

diff --git a/sdk/go/src/arvados.org/keepclient/keepclient.go b/sdk/go/src/arvados.org/keepclient/keepclient.go
index 91989bd..dadf8bf 100644
--- a/sdk/go/src/arvados.org/keepclient/keepclient.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient.go
@@ -11,6 +11,10 @@ import (
 	"io/ioutil"
 	"net/http"
 	"os"
+	"sort"
+	"sync"
+	"sync/atomic"
+	"unsafe"
 )
 
 // A Keep "block" is 64MB.
@@ -25,11 +29,12 @@ type KeepClient struct {
 	ApiServer     string
 	ApiToken      string
 	ApiInsecure   bool
-	Service_roots []string
 	Want_replicas int
 	Client        *http.Client
 	Using_proxy   bool
 	External      bool
+	service_roots *[]string
+	lock          sync.Mutex
 }
 
 // Create a new KeepClient, initialized with standard Arvados environment
@@ -50,7 +55,7 @@ func MakeKeepClient() (kc KeepClient, err error) {
 		Using_proxy: false,
 		External:    external}
 
-	err = (&kc).discoverKeepServers()
+	err = (&kc).DiscoverKeepServers()
 
 	return kc, err
 }
@@ -207,3 +212,22 @@ func (this KeepClient) AuthorizedAsk(hash string, signature string,
 	return 0, "", BlockNotFound
 
 }
+
+// Atomically read the service_roots field.
+func (this *KeepClient) ServiceRoots() []string {
+	r := (*[]string)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots))))
+	return *r
+}
+
+// Atomically update the service_roots field.  Enables you to update
+// service_roots without disrupting any GET or PUT operations that might
+// already be in progress.
+func (this *KeepClient) SetServiceRoots(svc []string) {
+	// Must be sorted for ShuffledServiceRoots() to produce consistent
+	// results.
+	roots := make([]string, len(svc))
+	copy(roots, svc)
+	sort.Strings(roots)
+	atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots)),
+		unsafe.Pointer(&roots))
+}
diff --git a/sdk/go/src/arvados.org/keepclient/keepclient_test.go b/sdk/go/src/arvados.org/keepclient/keepclient_test.go
index 600d739..395603d 100644
--- a/sdk/go/src/arvados.org/keepclient/keepclient_test.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient_test.go
@@ -13,7 +13,6 @@ import (
 	"net/http"
 	"os"
 	"os/exec"
-	"sort"
 	"strings"
 	"testing"
 )
@@ -75,13 +74,14 @@ func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
 	c.Check(kc.Client.Transport.(*http.Transport).TLSClientConfig.InsecureSkipVerify, Equals, true)
 
 	c.Assert(err, Equals, nil)
-	c.Check(len(kc.Service_roots), Equals, 2)
-	c.Check(kc.Service_roots[0], Equals, "http://localhost:25107")
-	c.Check(kc.Service_roots[1], Equals, "http://localhost:25108")
+	c.Check(len(kc.ServiceRoots()), Equals, 2)
+	c.Check(kc.ServiceRoots()[0], Equals, "http://localhost:25107")
+	c.Check(kc.ServiceRoots()[1], Equals, "http://localhost:25108")
 }
 
 func (s *StandaloneSuite) TestShuffleServiceRoots(c *C) {
-	kc := KeepClient{Service_roots: []string{"http://localhost:25107", "http://localhost:25108", "http://localhost:25109", "http://localhost:25110", "http://localhost:25111", "http://localhost:25112", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25116", "http://localhost:25117", "http://localhost:25118", "http://localhost:25119", "http://localhost:25120", "http://localhost:25121", "http://localhost:25122", "http://localhost:25123"}}
+	kc := KeepClient{}
+	kc.SetServiceRoots([]string{"http://localhost:25107", "http://localhost:25108", "http://localhost:25109", "http://localhost:25110", "http://localhost:25111", "http://localhost:25112", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25116", "http://localhost:25117", "http://localhost:25118", "http://localhost:25119", "http://localhost:25120", "http://localhost:25121", "http://localhost:25122", "http://localhost:25123"})
 
 	// "foo" acbd18db4cc2f85cedef654fccc4a4d8
 	foo_shuffle := []string{"http://localhost:25116", "http://localhost:25120", "http://localhost:25119", "http://localhost:25122", "http://localhost:25108", "http://localhost:25114", "http://localhost:25112", "http://localhost:25107", "http://localhost:25118", "http://localhost:25111", "http://localhost:25113", "http://localhost:25121", "http://localhost:25110", "http://localhost:25117", "http://localhost:25109", "http://localhost:25115", "http://localhost:25123"}
@@ -265,16 +265,16 @@ func (s *StandaloneSuite) TestPutB(c *C) {
 
 	kc.Want_replicas = 2
 	kc.ApiToken = "abc123"
-	kc.Service_roots = make([]string, 5)
+	service_roots := make([]string, 5)
 
 	ks := RunSomeFakeKeepServers(st, 5, 2990)
 
 	for i := 0; i < len(ks); i += 1 {
-		kc.Service_roots[i] = ks[i].url
+		service_roots[i] = ks[i].url
 		defer ks[i].listener.Close()
 	}
 
-	sort.Strings(kc.Service_roots)
+	kc.SetServiceRoots(service_roots)
 
 	kc.PutB([]byte("foo"))
 
@@ -306,16 +306,16 @@ func (s *StandaloneSuite) TestPutHR(c *C) {
 
 	kc.Want_replicas = 2
 	kc.ApiToken = "abc123"
-	kc.Service_roots = make([]string, 5)
+	service_roots := make([]string, 5)
 
 	ks := RunSomeFakeKeepServers(st, 5, 2990)
 
 	for i := 0; i < len(ks); i += 1 {
-		kc.Service_roots[i] = ks[i].url
+		service_roots[i] = ks[i].url
 		defer ks[i].listener.Close()
 	}
 
-	sort.Strings(kc.Service_roots)
+	kc.SetServiceRoots(service_roots)
 
 	reader, writer := io.Pipe()
 
@@ -359,21 +359,21 @@ func (s *StandaloneSuite) TestPutWithFail(c *C) {
 
 	kc.Want_replicas = 2
 	kc.ApiToken = "abc123"
-	kc.Service_roots = make([]string, 5)
+	service_roots := make([]string, 5)
 
 	ks1 := RunSomeFakeKeepServers(st, 4, 2990)
 	ks2 := RunSomeFakeKeepServers(fh, 1, 2995)
 
 	for i, k := range ks1 {
-		kc.Service_roots[i] = k.url
+		service_roots[i] = k.url
 		defer k.listener.Close()
 	}
 	for i, k := range ks2 {
-		kc.Service_roots[len(ks1)+i] = k.url
+		service_roots[len(ks1)+i] = k.url
 		defer k.listener.Close()
 	}
 
-	sort.Strings(kc.Service_roots)
+	kc.SetServiceRoots(service_roots)
 
 	shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
 
@@ -407,21 +407,21 @@ func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
 
 	kc.Want_replicas = 2
 	kc.ApiToken = "abc123"
-	kc.Service_roots = make([]string, 5)
+	service_roots := make([]string, 5)
 
 	ks1 := RunSomeFakeKeepServers(st, 1, 2990)
 	ks2 := RunSomeFakeKeepServers(fh, 4, 2991)
 
 	for i, k := range ks1 {
-		kc.Service_roots[i] = k.url
+		service_roots[i] = k.url
 		defer k.listener.Close()
 	}
 	for i, k := range ks2 {
-		kc.Service_roots[len(ks1)+i] = k.url
+		service_roots[len(ks1)+i] = k.url
 		defer k.listener.Close()
 	}
 
-	sort.Strings(kc.Service_roots)
+	kc.SetServiceRoots(service_roots)
 
 	shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
 
@@ -464,7 +464,7 @@ func (s *StandaloneSuite) TestGet(c *C) {
 
 	kc, _ := MakeKeepClient()
 	kc.ApiToken = "abc123"
-	kc.Service_roots = []string{url}
+	kc.SetServiceRoots([]string{url})
 
 	r, n, url2, err := kc.Get(hash)
 	defer r.Close()
@@ -489,7 +489,7 @@ func (s *StandaloneSuite) TestGetFail(c *C) {
 
 	kc, _ := MakeKeepClient()
 	kc.ApiToken = "abc123"
-	kc.Service_roots = []string{url}
+	kc.SetServiceRoots([]string{url})
 
 	r, n, url2, err := kc.Get(hash)
 	c.Check(err, Equals, BlockNotFound)
@@ -518,7 +518,7 @@ func (s *StandaloneSuite) TestChecksum(c *C) {
 
 	kc, _ := MakeKeepClient()
 	kc.ApiToken = "abc123"
-	kc.Service_roots = []string{url}
+	kc.SetServiceRoots([]string{url})
 
 	r, n, _, err := kc.Get(barhash)
 	_, err = ioutil.ReadAll(r)
@@ -550,21 +550,21 @@ func (s *StandaloneSuite) TestGetWithFailures(c *C) {
 
 	kc, _ := MakeKeepClient()
 	kc.ApiToken = "abc123"
-	kc.Service_roots = make([]string, 5)
+	service_roots := make([]string, 5)
 
 	ks1 := RunSomeFakeKeepServers(st, 1, 2990)
 	ks2 := RunSomeFakeKeepServers(fh, 4, 2991)
 
 	for i, k := range ks1 {
-		kc.Service_roots[i] = k.url
+		service_roots[i] = k.url
 		defer k.listener.Close()
 	}
 	for i, k := range ks2 {
-		kc.Service_roots[len(ks1)+i] = k.url
+		service_roots[len(ks1)+i] = k.url
 		defer k.listener.Close()
 	}
 
-	sort.Strings(kc.Service_roots)
+	kc.SetServiceRoots(service_roots)
 
 	r, n, url2, err := kc.Get(hash)
 	<-fh.handled
@@ -635,15 +635,17 @@ func (s *StandaloneSuite) TestPutProxy(c *C) {
 	kc.Want_replicas = 2
 	kc.Using_proxy = true
 	kc.ApiToken = "abc123"
-	kc.Service_roots = make([]string, 1)
+	service_roots := make([]string, 1)
 
 	ks1 := RunSomeFakeKeepServers(st, 1, 2990)
 
 	for i, k := range ks1 {
-		kc.Service_roots[i] = k.url
+		service_roots[i] = k.url
 		defer k.listener.Close()
 	}
 
+	kc.SetServiceRoots(service_roots)
+
 	_, replicas, err := kc.PutB([]byte("foo"))
 	<-st.handled
 
@@ -663,14 +665,15 @@ func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) {
 	kc.Want_replicas = 3
 	kc.Using_proxy = true
 	kc.ApiToken = "abc123"
-	kc.Service_roots = make([]string, 1)
+	service_roots := make([]string, 1)
 
 	ks1 := RunSomeFakeKeepServers(st, 1, 2990)
 
 	for i, k := range ks1 {
-		kc.Service_roots[i] = k.url
+		service_roots[i] = k.url
 		defer k.listener.Close()
 	}
+	kc.SetServiceRoots(service_roots)
 
 	_, replicas, err := kc.PutB([]byte("foo"))
 	<-st.handled
diff --git a/sdk/go/src/arvados.org/keepclient/support.go b/sdk/go/src/arvados.org/keepclient/support.go
index ef4a8e1..38669a1 100644
--- a/sdk/go/src/arvados.org/keepclient/support.go
+++ b/sdk/go/src/arvados.org/keepclient/support.go
@@ -10,7 +10,6 @@ import (
 	"log"
 	"net/http"
 	"os"
-	"sort"
 	"strconv"
 )
 
@@ -21,10 +20,9 @@ type keepDisk struct {
 	SvcType  string `json:"service_type"`
 }
 
-func (this *KeepClient) discoverKeepServers() error {
+func (this *KeepClient) DiscoverKeepServers() error {
 	if prx := os.Getenv("ARVADOS_KEEP_PROXY"); prx != "" {
-		this.Service_roots = make([]string, 1)
-		this.Service_roots[0] = prx
+		this.SetServiceRoots([]string{prx})
 		this.Using_proxy = true
 		return nil
 	}
@@ -72,7 +70,7 @@ func (this *KeepClient) discoverKeepServers() error {
 	}
 
 	listed := make(map[string]bool)
-	this.Service_roots = make([]string, 0, len(m.Items))
+	service_roots := make([]string, 0, len(m.Items))
 
 	for _, element := range m.Items {
 		n := ""
@@ -87,16 +85,14 @@ func (this *KeepClient) discoverKeepServers() error {
 		// Skip duplicates
 		if !listed[url] {
 			listed[url] = true
-			this.Service_roots = append(this.Service_roots, url)
+			service_roots = append(service_roots, url)
 		}
 		if element.SvcType == "proxy" {
 			this.Using_proxy = true
 		}
 	}
 
-	// Must be sorted for ShuffledServiceRoots() to produce consistent
-	// results.
-	sort.Strings(this.Service_roots)
+	this.SetServiceRoots(service_roots)
 
 	return nil
 }
@@ -111,11 +107,12 @@ func (this KeepClient) shuffledServiceRoots(hash string) (pseq []string) {
 	seed := hash
 
 	// Keep servers still to be added to the ordering
-	pool := make([]string, len(this.Service_roots))
-	copy(pool, this.Service_roots)
+	service_roots := this.ServiceRoots()
+	pool := make([]string, len(service_roots))
+	copy(pool, service_roots)
 
 	// output probe sequence
-	pseq = make([]string, 0, len(this.Service_roots))
+	pseq = make([]string, 0, len(service_roots))
 
 	// iterate while there are servers left to be assigned
 	for len(pool) > 0 {
diff --git a/services/keep/src/arvados.org/keepproxy/keepproxy.go b/services/keep/src/arvados.org/keepproxy/keepproxy.go
index ed33ac9..b914f47 100644
--- a/services/keep/src/arvados.org/keepproxy/keepproxy.go
+++ b/services/keep/src/arvados.org/keepproxy/keepproxy.go
@@ -92,8 +92,10 @@ func main() {
 		return
 	}
 
+	go RefreshServicesList(&kc)
+
 	// Start listening for requests.
-	http.Serve(listener, MakeRESTRouter(!no_get, !no_put, kc))
+	http.Serve(listener, MakeRESTRouter(!no_get, !no_put, &kc))
 }
 
 type ApiTokenCache struct {
@@ -102,6 +104,14 @@ type ApiTokenCache struct {
 	expireTime int64
 }
 
+// Refresh the keep service list every five minutes.
+func RefreshServicesList(kc *keepclient.KeepClient) {
+	for {
+		time.Sleep(300 * time.Second)
+		kc.DiscoverKeepServers()
+	}
+}
+
 // Cache the token and set an expire time.  If we already have an expire time
 // on the token, it is not updated.
 func (this *ApiTokenCache) RememberToken(token string) {
@@ -181,12 +191,12 @@ func CheckAuthorizationHeader(kc keepclient.KeepClient, cache *ApiTokenCache, re
 }
 
 type GetBlockHandler struct {
-	keepclient.KeepClient
+	*keepclient.KeepClient
 	*ApiTokenCache
 }
 
 type PutBlockHandler struct {
-	keepclient.KeepClient
+	*keepclient.KeepClient
 	*ApiTokenCache
 }
 
@@ -197,7 +207,7 @@ type PutBlockHandler struct {
 func MakeRESTRouter(
 	enable_get bool,
 	enable_put bool,
-	kc keepclient.KeepClient) *mux.Router {
+	kc *keepclient.KeepClient) *mux.Router {
 
 	t := &ApiTokenCache{tokens: make(map[string]int64), expireTime: 300}
 
@@ -222,7 +232,9 @@ func MakeRESTRouter(
 
 func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
 
-	if !CheckAuthorizationHeader(this.KeepClient, this.ApiTokenCache, req) {
+	kc := *this.KeepClient
+
+	if !CheckAuthorizationHeader(kc, this.ApiTokenCache, req) {
 		http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
 	}
 
@@ -235,10 +247,10 @@ func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
 	var blocklen int64
 
 	if req.Method == "GET" {
-		reader, blocklen, _, err = this.KeepClient.AuthorizedGet(hash, signature, timestamp)
+		reader, blocklen, _, err = kc.AuthorizedGet(hash, signature, timestamp)
 		defer reader.Close()
 	} else if req.Method == "HEAD" {
-		blocklen, _, err = this.KeepClient.AuthorizedAsk(hash, signature, timestamp)
+		blocklen, _, err = kc.AuthorizedAsk(hash, signature, timestamp)
 	}
 
 	resp.Header().Set("Content-Length", fmt.Sprint(blocklen))
@@ -259,7 +271,9 @@ func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
 
 	log.Print("PutBlockHandler start")
 
-	if !CheckAuthorizationHeader(this.KeepClient, this.ApiTokenCache, req) {
+	kc := *this.KeepClient
+
+	if !CheckAuthorizationHeader(kc, this.ApiTokenCache, req) {
 		http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
 	}
 
@@ -284,12 +298,12 @@ func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
 		var r int
 		_, err := fmt.Sscanf(req.Header.Get("X-Keep-Desired-Replicas"), "%d", &r)
 		if err != nil {
-			this.KeepClient.Want_replicas = r
+			kc.Want_replicas = r
 		}
 	}
 
 	// Now try to put the block through
-	replicas, err := this.KeepClient.PutHR(hash, req.Body, contentLength)
+	replicas, err := kc.PutHR(hash, req.Body, contentLength)
 
 	log.Printf("Replicas stored: %v err: %v", replicas, err)
 
diff --git a/services/keep/src/arvados.org/keepproxy/keepproxy_test.go b/services/keep/src/arvados.org/keepproxy/keepproxy_test.go
index af1377b..d8abda7 100644
--- a/services/keep/src/arvados.org/keepproxy/keepproxy_test.go
+++ b/services/keep/src/arvados.org/keepproxy/keepproxy_test.go
@@ -109,8 +109,8 @@ func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
 	kc, err := keepclient.MakeKeepClient()
 	c.Check(kc.External, Equals, true)
 	c.Check(kc.Using_proxy, Equals, true)
-	c.Check(len(kc.Service_roots), Equals, 1)
-	c.Check(kc.Service_roots[0], Equals, "http://localhost:29950")
+	c.Check(len(kc.ServiceRoots()), Equals, 1)
+	c.Check(kc.ServiceRoots()[0], Equals, "http://localhost:29950")
 	c.Check(err, Equals, nil)
 	os.Setenv("ARVADOS_EXTERNAL_CLIENT", "")
 
@@ -159,8 +159,8 @@ func (s *ServerRequiredSuite) TestPutAskGetForbidden(c *C) {
 	kc.ApiToken = "123xyz"
 	c.Check(kc.External, Equals, true)
 	c.Check(kc.Using_proxy, Equals, true)
-	c.Check(len(kc.Service_roots), Equals, 1)
-	c.Check(kc.Service_roots[0], Equals, "http://localhost:29950")
+	c.Check(len(kc.ServiceRoots()), Equals, 1)
+	c.Check(kc.ServiceRoots()[0], Equals, "http://localhost:29950")
 	c.Check(err, Equals, nil)
 	os.Setenv("ARVADOS_EXTERNAL_CLIENT", "")
 

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list