[ARVADOS] updated: a88e87137911142ef7e9237703a1073e4feffdb7
git at public.curoverse.com
git at public.curoverse.com
Wed May 13 16:03:18 EDT 2015
Summary of changes:
sdk/go/keepclient/keepclient.go | 16 ++++-
sdk/go/keepclient/keepclient_test.go | 123 +++++++++++++++++++++++++++++++----
sdk/go/keepclient/support.go | 17 +++--
services/keepproxy/keepproxy_test.go | 8 ++-
services/keepstore/pull_worker.go | 2 +-
5 files changed, 146 insertions(+), 20 deletions(-)
via a88e87137911142ef7e9237703a1073e4feffdb7 (commit)
from 731547ae2d548cd2869779c200813262c05581b2 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit a88e87137911142ef7e9237703a1073e4feffdb7
Author: Radhika Chippada <radhika at curoverse.com>
Date: Wed May 13 16:02:41 2015 -0400
4717: use keep_services -> read_only flag in go sdk.
diff --git a/sdk/go/keepclient/keepclient.go b/sdk/go/keepclient/keepclient.go
index 31cfb57..f908a5a 100644
--- a/sdk/go/keepclient/keepclient.go
+++ b/sdk/go/keepclient/keepclient.go
@@ -39,6 +39,7 @@ type KeepClient struct {
Using_proxy bool
localRoots *map[string]string
gatewayRoots *map[string]string
+ writableRoots *map[string]string
lock sync.RWMutex
Client *http.Client
}
@@ -194,6 +195,14 @@ func (kc *KeepClient) GatewayRoots() map[string]string {
return *kc.gatewayRoots
}
+// WritableRoots() returns the map of writable Keep services:
+// url -> ""
+func (kc *KeepClient) WritableRoots() map[string]string {
+ kc.lock.RLock()
+ defer kc.lock.RUnlock()
+ return *kc.writableRoots
+}
+
// SetServiceRoots updates the localRoots and gatewayRoots maps,
// without risk of disrupting operations that are already in progress.
//
@@ -201,7 +210,7 @@ func (kc *KeepClient) GatewayRoots() map[string]string {
// caller can reuse/modify them after SetServiceRoots returns, but
// they should not be modified by any other goroutine while
// SetServiceRoots is running.
-func (kc *KeepClient) SetServiceRoots(newLocals, newGateways map[string]string) {
+func (kc *KeepClient) SetServiceRoots(newLocals, newGateways map[string]string, writableRoots map[string]string) {
locals := make(map[string]string)
for uuid, root := range newLocals {
locals[uuid] = root
@@ -210,10 +219,15 @@ func (kc *KeepClient) SetServiceRoots(newLocals, newGateways map[string]string)
for uuid, root := range newGateways {
gateways[uuid] = root
}
+ writables := make(map[string]string)
+ for root, _ := range writableRoots {
+ writables[root] = ""
+ }
kc.lock.Lock()
defer kc.lock.Unlock()
kc.localRoots = &locals
kc.gatewayRoots = &gateways
+ kc.writableRoots = &writables
}
// getSortedRoots returns a list of base URIs of Keep services, in the
diff --git a/sdk/go/keepclient/keepclient_test.go b/sdk/go/keepclient/keepclient_test.go
index 236d8db..764cd0d 100644
--- a/sdk/go/keepclient/keepclient_test.go
+++ b/sdk/go/keepclient/keepclient_test.go
@@ -243,15 +243,17 @@ func (s *StandaloneSuite) TestPutB(c *C) {
kc.Want_replicas = 2
arv.ApiToken = "abc123"
localRoots := make(map[string]string)
+ writableRoots := make(map[string]string)
ks := RunSomeFakeKeepServers(st, 5)
for i, k := range ks {
localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+ writableRoots[k.url] = ""
defer k.listener.Close()
}
- kc.SetServiceRoots(localRoots, nil)
+ kc.SetServiceRoots(localRoots, nil, writableRoots)
kc.PutB([]byte("foo"))
@@ -286,15 +288,17 @@ func (s *StandaloneSuite) TestPutHR(c *C) {
kc.Want_replicas = 2
arv.ApiToken = "abc123"
localRoots := make(map[string]string)
+ writableRoots := make(map[string]string)
ks := RunSomeFakeKeepServers(st, 5)
for i, k := range ks {
localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+ writableRoots[k.url] = ""
defer k.listener.Close()
}
- kc.SetServiceRoots(localRoots, nil)
+ kc.SetServiceRoots(localRoots, nil, writableRoots)
reader, writer := io.Pipe()
@@ -340,20 +344,23 @@ func (s *StandaloneSuite) TestPutWithFail(c *C) {
kc.Want_replicas = 2
arv.ApiToken = "abc123"
localRoots := make(map[string]string)
+ writableRoots := make(map[string]string)
ks1 := RunSomeFakeKeepServers(st, 4)
ks2 := RunSomeFakeKeepServers(fh, 1)
for i, k := range ks1 {
localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+ writableRoots[k.url] = ""
defer k.listener.Close()
}
for i, k := range ks2 {
localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
+ writableRoots[k.url] = ""
defer k.listener.Close()
}
- kc.SetServiceRoots(localRoots, nil)
+ kc.SetServiceRoots(localRoots, nil, writableRoots)
shuff := NewRootSorter(
kc.LocalRoots(), Md5String("foo")).GetSortedRoots()
@@ -396,20 +403,23 @@ func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
kc.Want_replicas = 2
arv.ApiToken = "abc123"
localRoots := make(map[string]string)
+ writableRoots := make(map[string]string)
ks1 := RunSomeFakeKeepServers(st, 1)
ks2 := RunSomeFakeKeepServers(fh, 4)
for i, k := range ks1 {
localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+ writableRoots[k.url] = ""
defer k.listener.Close()
}
for i, k := range ks2 {
localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
+ writableRoots[k.url] = ""
defer k.listener.Close()
}
- kc.SetServiceRoots(localRoots, nil)
+ kc.SetServiceRoots(localRoots, nil, writableRoots)
_, replicas, err := kc.PutB([]byte("foo"))
@@ -454,7 +464,7 @@ func (s *StandaloneSuite) TestGet(c *C) {
arv, err := arvadosclient.MakeArvadosClient()
kc, _ := MakeKeepClient(&arv)
arv.ApiToken = "abc123"
- kc.SetServiceRoots(map[string]string{"x": ks.url}, nil)
+ kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, map[string]string{ks.url: ""})
r, n, url2, err := kc.Get(hash)
defer r.Close()
@@ -480,7 +490,7 @@ func (s *StandaloneSuite) TestGetFail(c *C) {
arv, err := arvadosclient.MakeArvadosClient()
kc, _ := MakeKeepClient(&arv)
arv.ApiToken = "abc123"
- kc.SetServiceRoots(map[string]string{"x": ks.url}, nil)
+ kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, map[string]string{ks.url: ""})
r, n, url2, err := kc.Get(hash)
c.Check(err, Equals, BlockNotFound)
@@ -515,7 +525,8 @@ func (s *StandaloneSuite) TestGetWithServiceHint(c *C) {
arv.ApiToken = "abc123"
kc.SetServiceRoots(
map[string]string{"x": ks0.url},
- map[string]string{uuid: ks.url})
+ map[string]string{uuid: ks.url},
+ map[string]string{ks0.url: "", ks.url: ""})
r, n, uri, err := kc.Get(hash + "+K@" + uuid)
defer r.Close()
@@ -566,6 +577,7 @@ func (s *StandaloneSuite) TestGetWithLocalServiceHint(c *C) {
"zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
"zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
uuid: ks.url},
+ map[string]string{ks.url: ""},
)
r, n, uri, err := kc.Get(hash + "+K@" + uuid)
@@ -603,7 +615,8 @@ func (s *StandaloneSuite) TestGetWithServiceHintFailoverToLocals(c *C) {
arv.ApiToken = "abc123"
kc.SetServiceRoots(
map[string]string{"zzzzz-bi6l4-keepdisk0000000": ksLocal.url},
- map[string]string{uuid: ksGateway.url})
+ map[string]string{uuid: ksGateway.url},
+ map[string]string{ksLocal.url: "", ksGateway.url: ""})
r, n, uri, err := kc.Get(hash + "+K@" + uuid)
c.Assert(err, Equals, nil)
@@ -637,7 +650,7 @@ func (s *StandaloneSuite) TestChecksum(c *C) {
arv, err := arvadosclient.MakeArvadosClient()
kc, _ := MakeKeepClient(&arv)
arv.ApiToken = "abc123"
- kc.SetServiceRoots(map[string]string{"x": ks.url}, nil)
+ kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, map[string]string{ks.url: ""})
r, n, _, err := kc.Get(barhash)
_, err = ioutil.ReadAll(r)
@@ -672,20 +685,23 @@ func (s *StandaloneSuite) TestGetWithFailures(c *C) {
kc, _ := MakeKeepClient(&arv)
arv.ApiToken = "abc123"
localRoots := make(map[string]string)
+ writableRoots := make(map[string]string)
ks1 := RunSomeFakeKeepServers(st, 1)
ks2 := RunSomeFakeKeepServers(fh, 4)
for i, k := range ks1 {
localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+ writableRoots[k.url] = ""
defer k.listener.Close()
}
for i, k := range ks2 {
localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
+ writableRoots[k.url] = ""
defer k.listener.Close()
}
- kc.SetServiceRoots(localRoots, nil)
+ kc.SetServiceRoots(localRoots, nil, writableRoots)
// This test works only if one of the failing services is
// attempted before the succeeding service. Otherwise,
@@ -766,15 +782,17 @@ func (s *StandaloneSuite) TestPutProxy(c *C) {
kc.Using_proxy = true
arv.ApiToken = "abc123"
localRoots := make(map[string]string)
+ writableRoots := make(map[string]string)
ks1 := RunSomeFakeKeepServers(st, 1)
for i, k := range ks1 {
localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+ writableRoots[k.url] = ""
defer k.listener.Close()
}
- kc.SetServiceRoots(localRoots, nil)
+ kc.SetServiceRoots(localRoots, nil, writableRoots)
_, replicas, err := kc.PutB([]byte("foo"))
<-st.handled
@@ -797,14 +815,16 @@ func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) {
kc.Using_proxy = true
arv.ApiToken = "abc123"
localRoots := make(map[string]string)
+ writableRoots := make(map[string]string)
ks1 := RunSomeFakeKeepServers(st, 1)
for i, k := range ks1 {
localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+ writableRoots[k.url] = ""
defer k.listener.Close()
}
- kc.SetServiceRoots(localRoots, nil)
+ kc.SetServiceRoots(localRoots, nil, writableRoots)
_, replicas, err := kc.PutB([]byte("foo"))
<-st.handled
@@ -853,3 +873,82 @@ func (s *StandaloneSuite) TestMakeLocatorInvalidInput(c *C) {
_, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31c")
c.Check(err, Equals, InvalidLocatorError)
}
+
+func (s *StandaloneSuite) TestPutBWant2ReplicasWithOnlyOneWritableRoots(c *C) {
+ log.Printf("TestPutWant2ReplicasWithOnlyOneWritableRoots")
+
+ hash := Md5String("foo")
+
+ st := StubPutHandler{
+ c,
+ hash,
+ "abc123",
+ "foo",
+ make(chan string, 5)}
+
+ arv, _ := arvadosclient.MakeArvadosClient()
+ kc, _ := MakeKeepClient(&arv)
+
+ kc.Want_replicas = 2
+ arv.ApiToken = "abc123"
+ localRoots := make(map[string]string)
+ writableRoots := make(map[string]string)
+
+ ks := RunSomeFakeKeepServers(st, 5)
+
+ for i, k := range ks {
+ localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+ if i == 0 {
+ writableRoots[k.url] = ""
+ }
+ defer k.listener.Close()
+ }
+
+ kc.SetServiceRoots(localRoots, nil, writableRoots)
+
+ _, replicas, err := kc.PutB([]byte("foo"))
+
+ c.Check(err, Equals, InsufficientReplicasError)
+ c.Check(replicas, Equals, 1)
+
+ c.Check(<-st.handled, Equals, localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", 0)])
+
+ log.Printf("TestPutWant2ReplicasWithOnlyOneWritableRoots done")
+}
+
+func (s *StandaloneSuite) TestPutBWithNoWritableRoots(c *C) {
+ log.Printf("TestPutBWithNoWritableRoots")
+
+ hash := Md5String("foo")
+
+ st := StubPutHandler{
+ c,
+ hash,
+ "abc123",
+ "foo",
+ make(chan string, 5)}
+
+ arv, _ := arvadosclient.MakeArvadosClient()
+ kc, _ := MakeKeepClient(&arv)
+
+ kc.Want_replicas = 2
+ arv.ApiToken = "abc123"
+ localRoots := make(map[string]string)
+ writableRoots := make(map[string]string)
+
+ ks := RunSomeFakeKeepServers(st, 5)
+
+ for i, k := range ks {
+ localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+ defer k.listener.Close()
+ }
+
+ kc.SetServiceRoots(localRoots, nil, writableRoots)
+
+ _, replicas, err := kc.PutB([]byte("foo"))
+
+ c.Check(err, Equals, InsufficientReplicasError)
+ c.Check(replicas, Equals, 0)
+
+ log.Printf("TestPutBWithNoWritableRoots done")
+}
diff --git a/sdk/go/keepclient/support.go b/sdk/go/keepclient/support.go
index 6393503..e9c8d68 100644
--- a/sdk/go/keepclient/support.go
+++ b/sdk/go/keepclient/support.go
@@ -21,6 +21,7 @@ type keepDisk struct {
Port int `json:"service_port"`
SSL bool `json:"service_ssl_flag"`
SvcType string `json:"service_type"`
+ ReadOnly bool `json:"read_only"`
}
func Md5String(s string) string {
@@ -93,6 +94,7 @@ func (this *KeepClient) DiscoverKeepServers() error {
listed := make(map[string]bool)
localRoots := make(map[string]string)
gatewayRoots := make(map[string]string)
+ writableRoots := make(map[string]string)
for _, service := range m.Items {
scheme := "http"
@@ -114,6 +116,11 @@ func (this *KeepClient) DiscoverKeepServers() error {
localRoots[service.Uuid] = url
this.Using_proxy = true
}
+
+ if service.ReadOnly == false {
+ writableRoots[url] = ""
+ }
+
// Gateway services are only used when specified by
// UUID, so there's nothing to gain by filtering them
// by service type. Including all accessible services
@@ -128,7 +135,7 @@ func (this *KeepClient) DiscoverKeepServers() error {
this.setClientSettingsStore()
}
- this.SetServiceRoots(localRoots, gatewayRoots)
+ this.SetServiceRoots(localRoots, gatewayRoots, writableRoots)
return nil
}
@@ -231,10 +238,12 @@ func (this KeepClient) putReplicas(
for active < remaining_replicas {
// Start some upload requests
if next_server < len(sv) {
- log.Printf("[%v] Begin upload %s to %s", requestId, hash, sv[next_server])
- go this.uploadToKeepServer(sv[next_server], hash, tr.MakeStreamReader(), upload_status, expectedLength, requestId)
+ if _, ok := this.WritableRoots()[sv[next_server]]; ok { // If writable
+ log.Printf("[%v] Begin upload %s to %s", requestId, hash, sv[next_server])
+ go this.uploadToKeepServer(sv[next_server], hash, tr.MakeStreamReader(), upload_status, expectedLength, requestId)
+ active += 1
+ }
next_server += 1
- active += 1
} else {
if active == 0 {
return locator, (this.Want_replicas - remaining_replicas), InsufficientReplicasError
diff --git a/services/keepproxy/keepproxy_test.go b/services/keepproxy/keepproxy_test.go
index a04e9fb..3530ac8 100644
--- a/services/keepproxy/keepproxy_test.go
+++ b/services/keepproxy/keepproxy_test.go
@@ -115,9 +115,13 @@ func runProxy(c *C, args []string, port int, bogusClientToken bool) keepclient.K
Using_proxy: true,
Client: &http.Client{},
}
- kc.SetServiceRoots(map[string]string{
+ locals := map[string]string{
"proxy": fmt.Sprintf("http://localhost:%v", port),
- }, nil)
+ }
+ writables := map[string]string{
+ fmt.Sprintf("http://localhost:%v", port): "",
+ }
+ kc.SetServiceRoots(locals, nil, writables)
c.Check(kc.Using_proxy, Equals, true)
c.Check(len(kc.LocalRoots()), Equals, 1)
for _, root := range kc.LocalRoots() {
diff --git a/services/keepstore/pull_worker.go b/services/keepstore/pull_worker.go
index d85458a..2707b98 100644
--- a/services/keepstore/pull_worker.go
+++ b/services/keepstore/pull_worker.go
@@ -46,7 +46,7 @@ func PullItemAndProcess(pullRequest PullRequest, token string, keepClient *keepc
for _, addr := range pullRequest.Servers {
service_roots[addr] = addr
}
- keepClient.SetServiceRoots(service_roots, nil)
+ keepClient.SetServiceRoots(service_roots, nil, keepClient.WritableRoots())
// Generate signature with a random token
expires_at := time.Now().Add(60 * time.Second)
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list