[ARVADOS] updated: 12f8c88325daf4c6af8cbf091ea64cc5d64566c0
git at public.curoverse.com
git at public.curoverse.com
Thu May 22 14:52:02 EDT 2014
Summary of changes:
sdk/go/src/arvados.org/keepclient/keepclient.go | 28 +++++++++-
.../src/arvados.org/keepclient/keepclient_test.go | 63 +++++++++++-----------
sdk/go/src/arvados.org/keepclient/support.go | 21 ++++----
.../keep/src/arvados.org/keepproxy/keepproxy.go | 34 ++++++++----
.../src/arvados.org/keepproxy/keepproxy_test.go | 8 +--
5 files changed, 96 insertions(+), 58 deletions(-)
via 12f8c88325daf4c6af8cbf091ea64cc5d64566c0 (commit)
from 575457ac8645c61ca71e94ce074291ec002b4c24 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 12f8c88325daf4c6af8cbf091ea64cc5d64566c0
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Thu May 22 14:51:56 2014 -0400
1885: Made ServiceRoots atomically updatable, so that KeepProxy can poll for
changes in the services list without disrupting any active flows.
diff --git a/sdk/go/src/arvados.org/keepclient/keepclient.go b/sdk/go/src/arvados.org/keepclient/keepclient.go
index 91989bd..dadf8bf 100644
--- a/sdk/go/src/arvados.org/keepclient/keepclient.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient.go
@@ -11,6 +11,10 @@ import (
"io/ioutil"
"net/http"
"os"
+ "sort"
+ "sync"
+ "sync/atomic"
+ "unsafe"
)
// A Keep "block" is 64MB.
@@ -25,11 +29,12 @@ type KeepClient struct {
ApiServer string
ApiToken string
ApiInsecure bool
- Service_roots []string
Want_replicas int
Client *http.Client
Using_proxy bool
External bool
+ service_roots *[]string
+ lock sync.Mutex
}
// Create a new KeepClient, initialized with standard Arvados environment
@@ -50,7 +55,7 @@ func MakeKeepClient() (kc KeepClient, err error) {
Using_proxy: false,
External: external}
- err = (&kc).discoverKeepServers()
+ err = (&kc).DiscoverKeepServers()
return kc, err
}
@@ -207,3 +212,22 @@ func (this KeepClient) AuthorizedAsk(hash string, signature string,
return 0, "", BlockNotFound
}
+
+// Atomically read the service_roots field.
+func (this *KeepClient) ServiceRoots() []string {
+ r := (*[]string)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots))))
+ return *r
+}
+
+// Atomically update the service_roots field. Enables you to update
+// service_roots without disrupting any GET or PUT operations that might
+// already be in progress.
+func (this *KeepClient) SetServiceRoots(svc []string) {
+ // Must be sorted for ShuffledServiceRoots() to produce consistent
+ // results.
+ roots := make([]string, len(svc))
+ copy(roots, svc)
+ sort.Strings(roots)
+ atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots)),
+ unsafe.Pointer(&roots))
+}
diff --git a/sdk/go/src/arvados.org/keepclient/keepclient_test.go b/sdk/go/src/arvados.org/keepclient/keepclient_test.go
index 600d739..395603d 100644
--- a/sdk/go/src/arvados.org/keepclient/keepclient_test.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient_test.go
@@ -13,7 +13,6 @@ import (
"net/http"
"os"
"os/exec"
- "sort"
"strings"
"testing"
)
@@ -75,13 +74,14 @@ func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
c.Check(kc.Client.Transport.(*http.Transport).TLSClientConfig.InsecureSkipVerify, Equals, true)
c.Assert(err, Equals, nil)
- c.Check(len(kc.Service_roots), Equals, 2)
- c.Check(kc.Service_roots[0], Equals, "http://localhost:25107")
- c.Check(kc.Service_roots[1], Equals, "http://localhost:25108")
+ c.Check(len(kc.ServiceRoots()), Equals, 2)
+ c.Check(kc.ServiceRoots()[0], Equals, "http://localhost:25107")
+ c.Check(kc.ServiceRoots()[1], Equals, "http://localhost:25108")
}
func (s *StandaloneSuite) TestShuffleServiceRoots(c *C) {
- kc := KeepClient{Service_roots: []string{"http://localhost:25107", "http://localhost:25108", "http://localhost:25109", "http://localhost:25110", "http://localhost:25111", "http://localhost:25112", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25116", "http://localhost:25117", "http://localhost:25118", "http://localhost:25119", "http://localhost:25120", "http://localhost:25121", "http://localhost:25122", "http://localhost:25123"}}
+ kc := KeepClient{}
+ kc.SetServiceRoots([]string{"http://localhost:25107", "http://localhost:25108", "http://localhost:25109", "http://localhost:25110", "http://localhost:25111", "http://localhost:25112", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25116", "http://localhost:25117", "http://localhost:25118", "http://localhost:25119", "http://localhost:25120", "http://localhost:25121", "http://localhost:25122", "http://localhost:25123"})
// "foo" acbd18db4cc2f85cedef654fccc4a4d8
foo_shuffle := []string{"http://localhost:25116", "http://localhost:25120", "http://localhost:25119", "http://localhost:25122", "http://localhost:25108", "http://localhost:25114", "http://localhost:25112", "http://localhost:25107", "http://localhost:25118", "http://localhost:25111", "http://localhost:25113", "http://localhost:25121", "http://localhost:25110", "http://localhost:25117", "http://localhost:25109", "http://localhost:25115", "http://localhost:25123"}
@@ -265,16 +265,16 @@ func (s *StandaloneSuite) TestPutB(c *C) {
kc.Want_replicas = 2
kc.ApiToken = "abc123"
- kc.Service_roots = make([]string, 5)
+ service_roots := make([]string, 5)
ks := RunSomeFakeKeepServers(st, 5, 2990)
for i := 0; i < len(ks); i += 1 {
- kc.Service_roots[i] = ks[i].url
+ service_roots[i] = ks[i].url
defer ks[i].listener.Close()
}
- sort.Strings(kc.Service_roots)
+ kc.SetServiceRoots(service_roots)
kc.PutB([]byte("foo"))
@@ -306,16 +306,16 @@ func (s *StandaloneSuite) TestPutHR(c *C) {
kc.Want_replicas = 2
kc.ApiToken = "abc123"
- kc.Service_roots = make([]string, 5)
+ service_roots := make([]string, 5)
ks := RunSomeFakeKeepServers(st, 5, 2990)
for i := 0; i < len(ks); i += 1 {
- kc.Service_roots[i] = ks[i].url
+ service_roots[i] = ks[i].url
defer ks[i].listener.Close()
}
- sort.Strings(kc.Service_roots)
+ kc.SetServiceRoots(service_roots)
reader, writer := io.Pipe()
@@ -359,21 +359,21 @@ func (s *StandaloneSuite) TestPutWithFail(c *C) {
kc.Want_replicas = 2
kc.ApiToken = "abc123"
- kc.Service_roots = make([]string, 5)
+ service_roots := make([]string, 5)
ks1 := RunSomeFakeKeepServers(st, 4, 2990)
ks2 := RunSomeFakeKeepServers(fh, 1, 2995)
for i, k := range ks1 {
- kc.Service_roots[i] = k.url
+ service_roots[i] = k.url
defer k.listener.Close()
}
for i, k := range ks2 {
- kc.Service_roots[len(ks1)+i] = k.url
+ service_roots[len(ks1)+i] = k.url
defer k.listener.Close()
}
- sort.Strings(kc.Service_roots)
+ kc.SetServiceRoots(service_roots)
shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
@@ -407,21 +407,21 @@ func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
kc.Want_replicas = 2
kc.ApiToken = "abc123"
- kc.Service_roots = make([]string, 5)
+ service_roots := make([]string, 5)
ks1 := RunSomeFakeKeepServers(st, 1, 2990)
ks2 := RunSomeFakeKeepServers(fh, 4, 2991)
for i, k := range ks1 {
- kc.Service_roots[i] = k.url
+ service_roots[i] = k.url
defer k.listener.Close()
}
for i, k := range ks2 {
- kc.Service_roots[len(ks1)+i] = k.url
+ service_roots[len(ks1)+i] = k.url
defer k.listener.Close()
}
- sort.Strings(kc.Service_roots)
+ kc.SetServiceRoots(service_roots)
shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
@@ -464,7 +464,7 @@ func (s *StandaloneSuite) TestGet(c *C) {
kc, _ := MakeKeepClient()
kc.ApiToken = "abc123"
- kc.Service_roots = []string{url}
+ kc.SetServiceRoots([]string{url})
r, n, url2, err := kc.Get(hash)
defer r.Close()
@@ -489,7 +489,7 @@ func (s *StandaloneSuite) TestGetFail(c *C) {
kc, _ := MakeKeepClient()
kc.ApiToken = "abc123"
- kc.Service_roots = []string{url}
+ kc.SetServiceRoots([]string{url})
r, n, url2, err := kc.Get(hash)
c.Check(err, Equals, BlockNotFound)
@@ -518,7 +518,7 @@ func (s *StandaloneSuite) TestChecksum(c *C) {
kc, _ := MakeKeepClient()
kc.ApiToken = "abc123"
- kc.Service_roots = []string{url}
+ kc.SetServiceRoots([]string{url})
r, n, _, err := kc.Get(barhash)
_, err = ioutil.ReadAll(r)
@@ -550,21 +550,21 @@ func (s *StandaloneSuite) TestGetWithFailures(c *C) {
kc, _ := MakeKeepClient()
kc.ApiToken = "abc123"
- kc.Service_roots = make([]string, 5)
+ service_roots := make([]string, 5)
ks1 := RunSomeFakeKeepServers(st, 1, 2990)
ks2 := RunSomeFakeKeepServers(fh, 4, 2991)
for i, k := range ks1 {
- kc.Service_roots[i] = k.url
+ service_roots[i] = k.url
defer k.listener.Close()
}
for i, k := range ks2 {
- kc.Service_roots[len(ks1)+i] = k.url
+ service_roots[len(ks1)+i] = k.url
defer k.listener.Close()
}
- sort.Strings(kc.Service_roots)
+ kc.SetServiceRoots(service_roots)
r, n, url2, err := kc.Get(hash)
<-fh.handled
@@ -635,15 +635,17 @@ func (s *StandaloneSuite) TestPutProxy(c *C) {
kc.Want_replicas = 2
kc.Using_proxy = true
kc.ApiToken = "abc123"
- kc.Service_roots = make([]string, 1)
+ service_roots := make([]string, 1)
ks1 := RunSomeFakeKeepServers(st, 1, 2990)
for i, k := range ks1 {
- kc.Service_roots[i] = k.url
+ service_roots[i] = k.url
defer k.listener.Close()
}
+ kc.SetServiceRoots(service_roots)
+
_, replicas, err := kc.PutB([]byte("foo"))
<-st.handled
@@ -663,14 +665,15 @@ func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) {
kc.Want_replicas = 3
kc.Using_proxy = true
kc.ApiToken = "abc123"
- kc.Service_roots = make([]string, 1)
+ service_roots := make([]string, 1)
ks1 := RunSomeFakeKeepServers(st, 1, 2990)
for i, k := range ks1 {
- kc.Service_roots[i] = k.url
+ service_roots[i] = k.url
defer k.listener.Close()
}
+ kc.SetServiceRoots(service_roots)
_, replicas, err := kc.PutB([]byte("foo"))
<-st.handled
diff --git a/sdk/go/src/arvados.org/keepclient/support.go b/sdk/go/src/arvados.org/keepclient/support.go
index ef4a8e1..38669a1 100644
--- a/sdk/go/src/arvados.org/keepclient/support.go
+++ b/sdk/go/src/arvados.org/keepclient/support.go
@@ -10,7 +10,6 @@ import (
"log"
"net/http"
"os"
- "sort"
"strconv"
)
@@ -21,10 +20,9 @@ type keepDisk struct {
SvcType string `json:"service_type"`
}
-func (this *KeepClient) discoverKeepServers() error {
+func (this *KeepClient) DiscoverKeepServers() error {
if prx := os.Getenv("ARVADOS_KEEP_PROXY"); prx != "" {
- this.Service_roots = make([]string, 1)
- this.Service_roots[0] = prx
+ this.SetServiceRoots([]string{prx})
this.Using_proxy = true
return nil
}
@@ -72,7 +70,7 @@ func (this *KeepClient) discoverKeepServers() error {
}
listed := make(map[string]bool)
- this.Service_roots = make([]string, 0, len(m.Items))
+ service_roots := make([]string, 0, len(m.Items))
for _, element := range m.Items {
n := ""
@@ -87,16 +85,14 @@ func (this *KeepClient) discoverKeepServers() error {
// Skip duplicates
if !listed[url] {
listed[url] = true
- this.Service_roots = append(this.Service_roots, url)
+ service_roots = append(service_roots, url)
}
if element.SvcType == "proxy" {
this.Using_proxy = true
}
}
- // Must be sorted for ShuffledServiceRoots() to produce consistent
- // results.
- sort.Strings(this.Service_roots)
+ this.SetServiceRoots(service_roots)
return nil
}
@@ -111,11 +107,12 @@ func (this KeepClient) shuffledServiceRoots(hash string) (pseq []string) {
seed := hash
// Keep servers still to be added to the ordering
- pool := make([]string, len(this.Service_roots))
- copy(pool, this.Service_roots)
+ service_roots := this.ServiceRoots()
+ pool := make([]string, len(service_roots))
+ copy(pool, service_roots)
// output probe sequence
- pseq = make([]string, 0, len(this.Service_roots))
+ pseq = make([]string, 0, len(service_roots))
// iterate while there are servers left to be assigned
for len(pool) > 0 {
diff --git a/services/keep/src/arvados.org/keepproxy/keepproxy.go b/services/keep/src/arvados.org/keepproxy/keepproxy.go
index ed33ac9..b914f47 100644
--- a/services/keep/src/arvados.org/keepproxy/keepproxy.go
+++ b/services/keep/src/arvados.org/keepproxy/keepproxy.go
@@ -92,8 +92,10 @@ func main() {
return
}
+ go RefreshServicesList(&kc)
+
// Start listening for requests.
- http.Serve(listener, MakeRESTRouter(!no_get, !no_put, kc))
+ http.Serve(listener, MakeRESTRouter(!no_get, !no_put, &kc))
}
type ApiTokenCache struct {
@@ -102,6 +104,14 @@ type ApiTokenCache struct {
expireTime int64
}
+// Refresh the keep service list every five minutes.
+func RefreshServicesList(kc *keepclient.KeepClient) {
+ for {
+ time.Sleep(300 * time.Second)
+ kc.DiscoverKeepServers()
+ }
+}
+
// Cache the token and set an expire time. If we already have an expire time
// on the token, it is not updated.
func (this *ApiTokenCache) RememberToken(token string) {
@@ -181,12 +191,12 @@ func CheckAuthorizationHeader(kc keepclient.KeepClient, cache *ApiTokenCache, re
}
type GetBlockHandler struct {
- keepclient.KeepClient
+ *keepclient.KeepClient
*ApiTokenCache
}
type PutBlockHandler struct {
- keepclient.KeepClient
+ *keepclient.KeepClient
*ApiTokenCache
}
@@ -197,7 +207,7 @@ type PutBlockHandler struct {
func MakeRESTRouter(
enable_get bool,
enable_put bool,
- kc keepclient.KeepClient) *mux.Router {
+ kc *keepclient.KeepClient) *mux.Router {
t := &ApiTokenCache{tokens: make(map[string]int64), expireTime: 300}
@@ -222,7 +232,9 @@ func MakeRESTRouter(
func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
- if !CheckAuthorizationHeader(this.KeepClient, this.ApiTokenCache, req) {
+ kc := *this.KeepClient
+
+ if !CheckAuthorizationHeader(kc, this.ApiTokenCache, req) {
http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
}
@@ -235,10 +247,10 @@ func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
var blocklen int64
if req.Method == "GET" {
- reader, blocklen, _, err = this.KeepClient.AuthorizedGet(hash, signature, timestamp)
+ reader, blocklen, _, err = kc.AuthorizedGet(hash, signature, timestamp)
defer reader.Close()
} else if req.Method == "HEAD" {
- blocklen, _, err = this.KeepClient.AuthorizedAsk(hash, signature, timestamp)
+ blocklen, _, err = kc.AuthorizedAsk(hash, signature, timestamp)
}
resp.Header().Set("Content-Length", fmt.Sprint(blocklen))
@@ -259,7 +271,9 @@ func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
log.Print("PutBlockHandler start")
- if !CheckAuthorizationHeader(this.KeepClient, this.ApiTokenCache, req) {
+ kc := *this.KeepClient
+
+ if !CheckAuthorizationHeader(kc, this.ApiTokenCache, req) {
http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
}
@@ -284,12 +298,12 @@ func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
var r int
_, err := fmt.Sscanf(req.Header.Get("X-Keep-Desired-Replicas"), "%d", &r)
if err != nil {
- this.KeepClient.Want_replicas = r
+ kc.Want_replicas = r
}
}
// Now try to put the block through
- replicas, err := this.KeepClient.PutHR(hash, req.Body, contentLength)
+ replicas, err := kc.PutHR(hash, req.Body, contentLength)
log.Printf("Replicas stored: %v err: %v", replicas, err)
diff --git a/services/keep/src/arvados.org/keepproxy/keepproxy_test.go b/services/keep/src/arvados.org/keepproxy/keepproxy_test.go
index af1377b..d8abda7 100644
--- a/services/keep/src/arvados.org/keepproxy/keepproxy_test.go
+++ b/services/keep/src/arvados.org/keepproxy/keepproxy_test.go
@@ -109,8 +109,8 @@ func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
kc, err := keepclient.MakeKeepClient()
c.Check(kc.External, Equals, true)
c.Check(kc.Using_proxy, Equals, true)
- c.Check(len(kc.Service_roots), Equals, 1)
- c.Check(kc.Service_roots[0], Equals, "http://localhost:29950")
+ c.Check(len(kc.ServiceRoots()), Equals, 1)
+ c.Check(kc.ServiceRoots()[0], Equals, "http://localhost:29950")
c.Check(err, Equals, nil)
os.Setenv("ARVADOS_EXTERNAL_CLIENT", "")
@@ -159,8 +159,8 @@ func (s *ServerRequiredSuite) TestPutAskGetForbidden(c *C) {
kc.ApiToken = "123xyz"
c.Check(kc.External, Equals, true)
c.Check(kc.Using_proxy, Equals, true)
- c.Check(len(kc.Service_roots), Equals, 1)
- c.Check(kc.Service_roots[0], Equals, "http://localhost:29950")
+ c.Check(len(kc.ServiceRoots()), Equals, 1)
+ c.Check(kc.ServiceRoots()[0], Equals, "http://localhost:29950")
c.Check(err, Equals, nil)
os.Setenv("ARVADOS_EXTERNAL_CLIENT", "")
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list