[ARVADOS] updated: a88e87137911142ef7e9237703a1073e4feffdb7

git at public.curoverse.com git at public.curoverse.com
Wed May 13 16:03:18 EDT 2015


Summary of changes:
 sdk/go/keepclient/keepclient.go      |  16 ++++-
 sdk/go/keepclient/keepclient_test.go | 123 +++++++++++++++++++++++++++++++----
 sdk/go/keepclient/support.go         |  17 +++--
 services/keepproxy/keepproxy_test.go |   8 ++-
 services/keepstore/pull_worker.go    |   2 +-
 5 files changed, 146 insertions(+), 20 deletions(-)

       via  a88e87137911142ef7e9237703a1073e4feffdb7 (commit)
      from  731547ae2d548cd2869779c200813262c05581b2 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit a88e87137911142ef7e9237703a1073e4feffdb7
Author: Radhika Chippada <radhika at curoverse.com>
Date:   Wed May 13 16:02:41 2015 -0400

    4717: use keep_services -> read_only flag in go sdk.

diff --git a/sdk/go/keepclient/keepclient.go b/sdk/go/keepclient/keepclient.go
index 31cfb57..f908a5a 100644
--- a/sdk/go/keepclient/keepclient.go
+++ b/sdk/go/keepclient/keepclient.go
@@ -39,6 +39,7 @@ type KeepClient struct {
 	Using_proxy   bool
 	localRoots    *map[string]string
 	gatewayRoots  *map[string]string
+	writableRoots *map[string]string
 	lock          sync.RWMutex
 	Client        *http.Client
 }
@@ -194,6 +195,14 @@ func (kc *KeepClient) GatewayRoots() map[string]string {
 	return *kc.gatewayRoots
 }
 
+// WritableRoots() returns the map of writable Keep services:
+// url -> ""
+func (kc *KeepClient) WritableRoots() map[string]string {
+	kc.lock.RLock()
+	defer kc.lock.RUnlock()
+	return *kc.writableRoots
+}
+
 // SetServiceRoots updates the localRoots and gatewayRoots maps,
 // without risk of disrupting operations that are already in progress.
 //
@@ -201,7 +210,7 @@ func (kc *KeepClient) GatewayRoots() map[string]string {
 // caller can reuse/modify them after SetServiceRoots returns, but
 // they should not be modified by any other goroutine while
 // SetServiceRoots is running.
-func (kc *KeepClient) SetServiceRoots(newLocals, newGateways map[string]string) {
+func (kc *KeepClient) SetServiceRoots(newLocals, newGateways map[string]string, writableRoots map[string]string) {
 	locals := make(map[string]string)
 	for uuid, root := range newLocals {
 		locals[uuid] = root
@@ -210,10 +219,15 @@ func (kc *KeepClient) SetServiceRoots(newLocals, newGateways map[string]string)
 	for uuid, root := range newGateways {
 		gateways[uuid] = root
 	}
+	writables := make(map[string]string)
+	for root, _ := range writableRoots {
+		writables[root] = ""
+	}
 	kc.lock.Lock()
 	defer kc.lock.Unlock()
 	kc.localRoots = &locals
 	kc.gatewayRoots = &gateways
+	kc.writableRoots = &writables
 }
 
 // getSortedRoots returns a list of base URIs of Keep services, in the
diff --git a/sdk/go/keepclient/keepclient_test.go b/sdk/go/keepclient/keepclient_test.go
index 236d8db..764cd0d 100644
--- a/sdk/go/keepclient/keepclient_test.go
+++ b/sdk/go/keepclient/keepclient_test.go
@@ -243,15 +243,17 @@ func (s *StandaloneSuite) TestPutB(c *C) {
 	kc.Want_replicas = 2
 	arv.ApiToken = "abc123"
 	localRoots := make(map[string]string)
+	writableRoots := make(map[string]string)
 
 	ks := RunSomeFakeKeepServers(st, 5)
 
 	for i, k := range ks {
 		localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+		writableRoots[k.url] = ""
 		defer k.listener.Close()
 	}
 
-	kc.SetServiceRoots(localRoots, nil)
+	kc.SetServiceRoots(localRoots, nil, writableRoots)
 
 	kc.PutB([]byte("foo"))
 
@@ -286,15 +288,17 @@ func (s *StandaloneSuite) TestPutHR(c *C) {
 	kc.Want_replicas = 2
 	arv.ApiToken = "abc123"
 	localRoots := make(map[string]string)
+	writableRoots := make(map[string]string)
 
 	ks := RunSomeFakeKeepServers(st, 5)
 
 	for i, k := range ks {
 		localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+		writableRoots[k.url] = ""
 		defer k.listener.Close()
 	}
 
-	kc.SetServiceRoots(localRoots, nil)
+	kc.SetServiceRoots(localRoots, nil, writableRoots)
 
 	reader, writer := io.Pipe()
 
@@ -340,20 +344,23 @@ func (s *StandaloneSuite) TestPutWithFail(c *C) {
 	kc.Want_replicas = 2
 	arv.ApiToken = "abc123"
 	localRoots := make(map[string]string)
+	writableRoots := make(map[string]string)
 
 	ks1 := RunSomeFakeKeepServers(st, 4)
 	ks2 := RunSomeFakeKeepServers(fh, 1)
 
 	for i, k := range ks1 {
 		localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+		writableRoots[k.url] = ""
 		defer k.listener.Close()
 	}
 	for i, k := range ks2 {
 		localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
+		writableRoots[k.url] = ""
 		defer k.listener.Close()
 	}
 
-	kc.SetServiceRoots(localRoots, nil)
+	kc.SetServiceRoots(localRoots, nil, writableRoots)
 
 	shuff := NewRootSorter(
 		kc.LocalRoots(), Md5String("foo")).GetSortedRoots()
@@ -396,20 +403,23 @@ func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
 	kc.Want_replicas = 2
 	arv.ApiToken = "abc123"
 	localRoots := make(map[string]string)
+	writableRoots := make(map[string]string)
 
 	ks1 := RunSomeFakeKeepServers(st, 1)
 	ks2 := RunSomeFakeKeepServers(fh, 4)
 
 	for i, k := range ks1 {
 		localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+		writableRoots[k.url] = ""
 		defer k.listener.Close()
 	}
 	for i, k := range ks2 {
 		localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
+		writableRoots[k.url] = ""
 		defer k.listener.Close()
 	}
 
-	kc.SetServiceRoots(localRoots, nil)
+	kc.SetServiceRoots(localRoots, nil, writableRoots)
 
 	_, replicas, err := kc.PutB([]byte("foo"))
 
@@ -454,7 +464,7 @@ func (s *StandaloneSuite) TestGet(c *C) {
 	arv, err := arvadosclient.MakeArvadosClient()
 	kc, _ := MakeKeepClient(&arv)
 	arv.ApiToken = "abc123"
-	kc.SetServiceRoots(map[string]string{"x": ks.url}, nil)
+	kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, map[string]string{ks.url: ""})
 
 	r, n, url2, err := kc.Get(hash)
 	defer r.Close()
@@ -480,7 +490,7 @@ func (s *StandaloneSuite) TestGetFail(c *C) {
 	arv, err := arvadosclient.MakeArvadosClient()
 	kc, _ := MakeKeepClient(&arv)
 	arv.ApiToken = "abc123"
-	kc.SetServiceRoots(map[string]string{"x": ks.url}, nil)
+	kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, map[string]string{ks.url: ""})
 
 	r, n, url2, err := kc.Get(hash)
 	c.Check(err, Equals, BlockNotFound)
@@ -515,7 +525,8 @@ func (s *StandaloneSuite) TestGetWithServiceHint(c *C) {
 	arv.ApiToken = "abc123"
 	kc.SetServiceRoots(
 		map[string]string{"x": ks0.url},
-		map[string]string{uuid: ks.url})
+		map[string]string{uuid: ks.url},
+		map[string]string{ks0.url: "", ks.url: ""})
 
 	r, n, uri, err := kc.Get(hash + "+K@" + uuid)
 	defer r.Close()
@@ -566,6 +577,7 @@ func (s *StandaloneSuite) TestGetWithLocalServiceHint(c *C) {
 			"zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
 			"zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
 			uuid: ks.url},
+		map[string]string{ks.url: ""},
 	)
 
 	r, n, uri, err := kc.Get(hash + "+K@" + uuid)
@@ -603,7 +615,8 @@ func (s *StandaloneSuite) TestGetWithServiceHintFailoverToLocals(c *C) {
 	arv.ApiToken = "abc123"
 	kc.SetServiceRoots(
 		map[string]string{"zzzzz-bi6l4-keepdisk0000000": ksLocal.url},
-		map[string]string{uuid: ksGateway.url})
+		map[string]string{uuid: ksGateway.url},
+		map[string]string{ksLocal.url: "", ksGateway.url: ""})
 
 	r, n, uri, err := kc.Get(hash + "+K@" + uuid)
 	c.Assert(err, Equals, nil)
@@ -637,7 +650,7 @@ func (s *StandaloneSuite) TestChecksum(c *C) {
 	arv, err := arvadosclient.MakeArvadosClient()
 	kc, _ := MakeKeepClient(&arv)
 	arv.ApiToken = "abc123"
-	kc.SetServiceRoots(map[string]string{"x": ks.url}, nil)
+	kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, map[string]string{ks.url: ""})
 
 	r, n, _, err := kc.Get(barhash)
 	_, err = ioutil.ReadAll(r)
@@ -672,20 +685,23 @@ func (s *StandaloneSuite) TestGetWithFailures(c *C) {
 	kc, _ := MakeKeepClient(&arv)
 	arv.ApiToken = "abc123"
 	localRoots := make(map[string]string)
+	writableRoots := make(map[string]string)
 
 	ks1 := RunSomeFakeKeepServers(st, 1)
 	ks2 := RunSomeFakeKeepServers(fh, 4)
 
 	for i, k := range ks1 {
 		localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+		writableRoots[k.url] = ""
 		defer k.listener.Close()
 	}
 	for i, k := range ks2 {
 		localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
+		writableRoots[k.url] = ""
 		defer k.listener.Close()
 	}
 
-	kc.SetServiceRoots(localRoots, nil)
+	kc.SetServiceRoots(localRoots, nil, writableRoots)
 
 	// This test works only if one of the failing services is
 	// attempted before the succeeding service. Otherwise,
@@ -766,15 +782,17 @@ func (s *StandaloneSuite) TestPutProxy(c *C) {
 	kc.Using_proxy = true
 	arv.ApiToken = "abc123"
 	localRoots := make(map[string]string)
+	writableRoots := make(map[string]string)
 
 	ks1 := RunSomeFakeKeepServers(st, 1)
 
 	for i, k := range ks1 {
 		localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+		writableRoots[k.url] = ""
 		defer k.listener.Close()
 	}
 
-	kc.SetServiceRoots(localRoots, nil)
+	kc.SetServiceRoots(localRoots, nil, writableRoots)
 
 	_, replicas, err := kc.PutB([]byte("foo"))
 	<-st.handled
@@ -797,14 +815,16 @@ func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) {
 	kc.Using_proxy = true
 	arv.ApiToken = "abc123"
 	localRoots := make(map[string]string)
+	writableRoots := make(map[string]string)
 
 	ks1 := RunSomeFakeKeepServers(st, 1)
 
 	for i, k := range ks1 {
 		localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+		writableRoots[k.url] = ""
 		defer k.listener.Close()
 	}
-	kc.SetServiceRoots(localRoots, nil)
+	kc.SetServiceRoots(localRoots, nil, writableRoots)
 
 	_, replicas, err := kc.PutB([]byte("foo"))
 	<-st.handled
@@ -853,3 +873,82 @@ func (s *StandaloneSuite) TestMakeLocatorInvalidInput(c *C) {
 	_, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31c")
 	c.Check(err, Equals, InvalidLocatorError)
 }
+
+func (s *StandaloneSuite) TestPutBWant2ReplicasWithOnlyOneWritableRoots(c *C) {
+	log.Printf("TestPutWant2ReplicasWithOnlyOneWritableRoots")
+
+	hash := Md5String("foo")
+
+	st := StubPutHandler{
+		c,
+		hash,
+		"abc123",
+		"foo",
+		make(chan string, 5)}
+
+	arv, _ := arvadosclient.MakeArvadosClient()
+	kc, _ := MakeKeepClient(&arv)
+
+	kc.Want_replicas = 2
+	arv.ApiToken = "abc123"
+	localRoots := make(map[string]string)
+	writableRoots := make(map[string]string)
+
+	ks := RunSomeFakeKeepServers(st, 5)
+
+	for i, k := range ks {
+		localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+		if i == 0 {
+			writableRoots[k.url] = ""
+		}
+		defer k.listener.Close()
+	}
+
+	kc.SetServiceRoots(localRoots, nil, writableRoots)
+
+	_, replicas, err := kc.PutB([]byte("foo"))
+
+	c.Check(err, Equals, InsufficientReplicasError)
+	c.Check(replicas, Equals, 1)
+
+	c.Check(<-st.handled, Equals, localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", 0)])
+
+	log.Printf("TestPutWant2ReplicasWithOnlyOneWritableRoots done")
+}
+
+func (s *StandaloneSuite) TestPutBWithNoWritableRoots(c *C) {
+	log.Printf("TestPutBWithNoWritableRoots")
+
+	hash := Md5String("foo")
+
+	st := StubPutHandler{
+		c,
+		hash,
+		"abc123",
+		"foo",
+		make(chan string, 5)}
+
+	arv, _ := arvadosclient.MakeArvadosClient()
+	kc, _ := MakeKeepClient(&arv)
+
+	kc.Want_replicas = 2
+	arv.ApiToken = "abc123"
+	localRoots := make(map[string]string)
+	writableRoots := make(map[string]string)
+
+	ks := RunSomeFakeKeepServers(st, 5)
+
+	for i, k := range ks {
+		localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+		defer k.listener.Close()
+	}
+
+	kc.SetServiceRoots(localRoots, nil, writableRoots)
+
+	_, replicas, err := kc.PutB([]byte("foo"))
+
+	c.Check(err, Equals, InsufficientReplicasError)
+	c.Check(replicas, Equals, 0)
+
+	log.Printf("TestPutBWithNoWritableRoots done")
+}
diff --git a/sdk/go/keepclient/support.go b/sdk/go/keepclient/support.go
index 6393503..e9c8d68 100644
--- a/sdk/go/keepclient/support.go
+++ b/sdk/go/keepclient/support.go
@@ -21,6 +21,7 @@ type keepDisk struct {
 	Port     int    `json:"service_port"`
 	SSL      bool   `json:"service_ssl_flag"`
 	SvcType  string `json:"service_type"`
+	ReadOnly bool   `json:"read_only"`
 }
 
 func Md5String(s string) string {
@@ -93,6 +94,7 @@ func (this *KeepClient) DiscoverKeepServers() error {
 	listed := make(map[string]bool)
 	localRoots := make(map[string]string)
 	gatewayRoots := make(map[string]string)
+	writableRoots := make(map[string]string)
 
 	for _, service := range m.Items {
 		scheme := "http"
@@ -114,6 +116,11 @@ func (this *KeepClient) DiscoverKeepServers() error {
 			localRoots[service.Uuid] = url
 			this.Using_proxy = true
 		}
+
+		if service.ReadOnly == false {
+			writableRoots[url] = ""
+		}
+
 		// Gateway services are only used when specified by
 		// UUID, so there's nothing to gain by filtering them
 		// by service type. Including all accessible services
@@ -128,7 +135,7 @@ func (this *KeepClient) DiscoverKeepServers() error {
 		this.setClientSettingsStore()
 	}
 
-	this.SetServiceRoots(localRoots, gatewayRoots)
+	this.SetServiceRoots(localRoots, gatewayRoots, writableRoots)
 	return nil
 }
 
@@ -231,10 +238,12 @@ func (this KeepClient) putReplicas(
 		for active < remaining_replicas {
 			// Start some upload requests
 			if next_server < len(sv) {
-				log.Printf("[%v] Begin upload %s to %s", requestId, hash, sv[next_server])
-				go this.uploadToKeepServer(sv[next_server], hash, tr.MakeStreamReader(), upload_status, expectedLength, requestId)
+				if _, ok := this.WritableRoots()[sv[next_server]]; ok { // If writable
+					log.Printf("[%v] Begin upload %s to %s", requestId, hash, sv[next_server])
+					go this.uploadToKeepServer(sv[next_server], hash, tr.MakeStreamReader(), upload_status, expectedLength, requestId)
+					active += 1
+				}
 				next_server += 1
-				active += 1
 			} else {
 				if active == 0 {
 					return locator, (this.Want_replicas - remaining_replicas), InsufficientReplicasError
diff --git a/services/keepproxy/keepproxy_test.go b/services/keepproxy/keepproxy_test.go
index a04e9fb..3530ac8 100644
--- a/services/keepproxy/keepproxy_test.go
+++ b/services/keepproxy/keepproxy_test.go
@@ -115,9 +115,13 @@ func runProxy(c *C, args []string, port int, bogusClientToken bool) keepclient.K
 		Using_proxy:   true,
 		Client:        &http.Client{},
 	}
-	kc.SetServiceRoots(map[string]string{
+	locals := map[string]string{
 		"proxy": fmt.Sprintf("http://localhost:%v", port),
-	}, nil)
+	}
+	writables := map[string]string{
+		fmt.Sprintf("http://localhost:%v", port): "",
+	}
+	kc.SetServiceRoots(locals, nil, writables)
 	c.Check(kc.Using_proxy, Equals, true)
 	c.Check(len(kc.LocalRoots()), Equals, 1)
 	for _, root := range kc.LocalRoots() {
diff --git a/services/keepstore/pull_worker.go b/services/keepstore/pull_worker.go
index d85458a..2707b98 100644
--- a/services/keepstore/pull_worker.go
+++ b/services/keepstore/pull_worker.go
@@ -46,7 +46,7 @@ func PullItemAndProcess(pullRequest PullRequest, token string, keepClient *keepc
 	for _, addr := range pullRequest.Servers {
 		service_roots[addr] = addr
 	}
-	keepClient.SetServiceRoots(service_roots, nil)
+	keepClient.SetServiceRoots(service_roots, nil, keepClient.WritableRoots())
 
 	// Generate signature with a random token
 	expires_at := time.Now().Add(60 * time.Second)

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list