[ARVADOS] updated: d54a48fa6fe94e9b80bf32c1d357e4dc3b3d67c3

git at public.curoverse.com git at public.curoverse.com
Wed May 21 16:00:09 EDT 2014


Summary of changes:
 sdk/go/src/arvados.org/keepclient/keepclient.go    |  7 ++-
 .../src/arvados.org/keepclient/keepclient_test.go  | 73 ++++++++++++++++++++--
 sdk/go/src/arvados.org/keepclient/support.go       | 70 ++++++++++++++++-----
 3 files changed, 129 insertions(+), 21 deletions(-)

       via  d54a48fa6fe94e9b80bf32c1d357e4dc3b3d67c3 (commit)
      from  c6a6693dc36615effca5e3363b81199362007c59 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit d54a48fa6fe94e9b80bf32c1d357e4dc3b3d67c3
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed May 21 16:00:05 2014 -0400

    2798: Adds client side support for Keep proxy X-Keep-Desired-Replicas and
    X-Keep-Replicas-Stored headers, and ARVADOS_KEEP_PROXY environment variable.
    Added tests.

diff --git a/sdk/go/src/arvados.org/keepclient/keepclient.go b/sdk/go/src/arvados.org/keepclient/keepclient.go
index dc3ceed..e16c853 100644
--- a/sdk/go/src/arvados.org/keepclient/keepclient.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient.go
@@ -28,6 +28,7 @@ type KeepClient struct {
 	Service_roots []string
 	Want_replicas int
 	Client        *http.Client
+	Using_proxy   bool
 }
 
 // Create a new KeepClient, initialized with standard Arvados environment
@@ -35,7 +36,7 @@ type KeepClient struct {
 // ARVADOS_API_HOST_INSECURE.  This will contact the API server to discover
 // Keep servers.
 func MakeKeepClient() (kc KeepClient, err error) {
-	insecure := (os.Getenv("ARVADOS_API_HOST_INSECURE") != "")
+	insecure := (os.Getenv("ARVADOS_API_HOST_INSECURE") == "true")
 
 	kc = KeepClient{
 		ApiServer:     os.Getenv("ARVADOS_API_HOST"),
@@ -43,8 +44,8 @@ func MakeKeepClient() (kc KeepClient, err error) {
 		ApiInsecure:   insecure,
 		Want_replicas: 2,
 		Client: &http.Client{Transport: &http.Transport{
-			TLSClientConfig: &tls.Config{InsecureSkipVerify: insecure},
-		}}}
+			TLSClientConfig: &tls.Config{InsecureSkipVerify: insecure}}},
+		Using_proxy: false}
 
 	err = (&kc).discoverKeepServers()
 
diff --git a/sdk/go/src/arvados.org/keepclient/keepclient_test.go b/sdk/go/src/arvados.org/keepclient/keepclient_test.go
index 1d3bbee..1ef5fd6 100644
--- a/sdk/go/src/arvados.org/keepclient/keepclient_test.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient_test.go
@@ -161,7 +161,7 @@ func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
 
 			<-st.handled
 			status := <-upload_status
-			c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200})
+			c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1})
 		})
 
 	log.Printf("TestUploadToStubKeepServer done")
@@ -194,7 +194,7 @@ func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
 			<-st.handled
 
 			status := <-upload_status
-			c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200})
+			c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1})
 		})
 
 	log.Printf("TestUploadToStubKeepServerBufferReader done")
@@ -229,8 +229,8 @@ func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
 			<-st.handled
 
 			status := <-upload_status
-			c.Check(status.Url, Equals, fmt.Sprintf("%s/%s", url, hash))
-			c.Check(status.StatusCode, Equals, 500)
+			c.Check(status.url, Equals, fmt.Sprintf("%s/%s", url, hash))
+			c.Check(status.statusCode, Equals, 500)
 		})
 	log.Printf("TestFailedUploadToStubKeepServer done")
 }
@@ -610,3 +610,68 @@ func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
 		c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash))
 	}
 }
+
+type StubProxyHandler struct {
+	handled chan string
+}
+
+func (this StubProxyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+	resp.Header().Set("X-Keep-Replicas-Stored", "2")
+	this.handled <- fmt.Sprintf("http://%s", req.Host)
+}
+
+func (s *StandaloneSuite) TestPutProxy(c *C) {
+	log.Printf("TestPutProxy")
+
+	st := StubProxyHandler{make(chan string, 1)}
+
+	kc, _ := MakeKeepClient()
+
+	kc.Want_replicas = 2
+	kc.Using_proxy = true
+	kc.ApiToken = "abc123"
+	kc.Service_roots = make([]string, 1)
+
+	ks1 := RunSomeFakeKeepServers(st, 1, 2990)
+
+	for i, k := range ks1 {
+		kc.Service_roots[i] = k.url
+		defer k.listener.Close()
+	}
+
+	_, replicas, err := kc.PutB([]byte("foo"))
+	<-st.handled
+
+	c.Check(err, Equals, nil)
+	c.Check(replicas, Equals, 2)
+
+	log.Printf("TestPutProxy done")
+}
+
+func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) {
+	log.Printf("TestPutProxy")
+
+	st := StubProxyHandler{make(chan string, 1)}
+
+	kc, _ := MakeKeepClient()
+
+	kc.Want_replicas = 3
+	kc.Using_proxy = true
+	kc.ApiToken = "abc123"
+	kc.Service_roots = make([]string, 1)
+
+	ks1 := RunSomeFakeKeepServers(st, 1, 2990)
+
+	for i, k := range ks1 {
+		kc.Service_roots[i] = k.url
+		defer k.listener.Close()
+	}
+
+	_, replicas, err := kc.PutB([]byte("foo"))
+	<-st.handled
+
+	c.Check(err, Equals, InsufficientReplicasError)
+	c.Check(replicas, Equals, 2)
+
+	log.Printf("TestPutProxy done")
+}
diff --git a/sdk/go/src/arvados.org/keepclient/support.go b/sdk/go/src/arvados.org/keepclient/support.go
index e657a60..d0ea967 100644
--- a/sdk/go/src/arvados.org/keepclient/support.go
+++ b/sdk/go/src/arvados.org/keepclient/support.go
@@ -9,6 +9,7 @@ import (
 	"io"
 	"log"
 	"net/http"
+	"os"
 	"sort"
 	"strconv"
 )
@@ -17,13 +18,22 @@ type keepDisk struct {
 	Hostname string `json:"service_host"`
 	Port     int    `json:"service_port"`
 	SSL      bool   `json:"service_ssl_flag"`
+	SvcType  string `json:"service_type"`
 }
 
 func (this *KeepClient) discoverKeepServers() error {
+	if prx := os.Getenv("ARVADOS_KEEP_PROXY"); prx != "" {
+		this.Service_roots = make([]string, 1)
+		this.Service_roots[0] = prx
+		this.Using_proxy = true
+		return nil
+	}
+
 	// Construct request of keep disk list
 	var req *http.Request
 	var err error
-	if req, err = http.NewRequest("GET", fmt.Sprintf("https://%s/arvados/v1/keep_disks", this.ApiServer), nil); err != nil {
+
+	if req, err = http.NewRequest("GET", fmt.Sprintf("https://%s/arvados/v1/keep_services/accessible", this.ApiServer), nil); err != nil {
 		return err
 	}
 
@@ -36,6 +46,17 @@ func (this *KeepClient) discoverKeepServers() error {
 		return err
 	}
 
+	if resp.StatusCode != 200 {
+		// fall back on keep disks
+		if req, err = http.NewRequest("GET", fmt.Sprintf("https://%s/arvados/v1/keep_disks", this.ApiServer), nil); err != nil {
+			return err
+		}
+		req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
+		if resp, err = this.Client.Do(req); err != nil {
+			return err
+		}
+	}
+
 	type svcList struct {
 		Items []keepDisk `json:"items"`
 	}
@@ -52,6 +73,7 @@ func (this *KeepClient) discoverKeepServers() error {
 
 	for _, element := range m.Items {
 		n := ""
+
 		if element.SSL {
 			n = "s"
 		}
@@ -64,6 +86,9 @@ func (this *KeepClient) discoverKeepServers() error {
 			listed[url] = true
 			this.Service_roots = append(this.Service_roots, url)
 		}
+		if element.SvcType == "proxy" {
+			this.Using_proxy = true
+		}
 	}
 
 	// Must be sorted for ShuffledServiceRoots() to produce consistent
@@ -122,9 +147,10 @@ func (this KeepClient) shuffledServiceRoots(hash string) (pseq []string) {
 }
 
 type uploadStatus struct {
-	Err        error
-	Url        string
-	StatusCode int
+	err             error
+	url             string
+	statusCode      int
+	replicas_stored int
 }
 
 func (this KeepClient) uploadToKeepServer(host string, hash string, body io.ReadCloser,
@@ -136,7 +162,7 @@ func (this KeepClient) uploadToKeepServer(host string, hash string, body io.Read
 	var err error
 	var url = fmt.Sprintf("%s/%s", host, hash)
 	if req, err = http.NewRequest("PUT", url, nil); err != nil {
-		upload_status <- uploadStatus{err, url, 0}
+		upload_status <- uploadStatus{err, url, 0, 0}
 		body.Close()
 		return
 	}
@@ -147,18 +173,28 @@ func (this KeepClient) uploadToKeepServer(host string, hash string, body io.Read
 
 	req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
 	req.Header.Add("Content-Type", "application/octet-stream")
+
+	if this.Using_proxy {
+		req.Header.Add("X-Keep-Desired-Replicas", fmt.Sprint(this.Want_replicas))
+	}
+
 	req.Body = body
 
 	var resp *http.Response
 	if resp, err = this.Client.Do(req); err != nil {
-		upload_status <- uploadStatus{err, url, 0}
+		upload_status <- uploadStatus{err, url, 0, 0}
 		return
 	}
 
+	rep := 1
+	if xr := resp.Header.Get("X-Keep-Replicas-Stored"); xr != "" {
+		fmt.Sscanf(xr, "%d", &rep)
+	}
+
 	if resp.StatusCode == http.StatusOK {
-		upload_status <- uploadStatus{nil, url, resp.StatusCode}
+		upload_status <- uploadStatus{nil, url, resp.StatusCode, rep}
 	} else {
-		upload_status <- uploadStatus{errors.New(resp.Status), url, resp.StatusCode}
+		upload_status <- uploadStatus{errors.New(resp.Status), url, resp.StatusCode, rep}
 	}
 }
 
@@ -181,6 +217,7 @@ func (this KeepClient) putReplicas(
 	defer close(upload_status)
 
 	// Desired number of replicas
+
 	remaining_replicas := this.Want_replicas
 
 	for remaining_replicas > 0 {
@@ -191,23 +228,28 @@ func (this KeepClient) putReplicas(
 				next_server += 1
 				active += 1
 			} else {
-				return (this.Want_replicas - remaining_replicas), InsufficientReplicasError
+				fmt.Print(active)
+				if active == 0 {
+					return (this.Want_replicas - remaining_replicas), InsufficientReplicasError
+				} else {
+					break
+				}
 			}
 		}
 
 		// Now wait for something to happen.
 		status := <-upload_status
-		if status.StatusCode == 200 {
+		if status.statusCode == 200 {
 			// good news!
-			remaining_replicas -= 1
+			remaining_replicas -= status.replicas_stored
 		} else {
 			// writing to keep server failed for some reason
 			log.Printf("Keep server put to %v failed with '%v'",
-				status.Url, status.Err)
+				status.url, status.err)
 		}
 		active -= 1
-		log.Printf("Upload status %v %v %v", status.StatusCode, remaining_replicas, active)
+		log.Printf("Upload status %v %v %v", status.statusCode, remaining_replicas, active)
 	}
 
-	return (this.Want_replicas - remaining_replicas), nil
+	return this.Want_replicas, nil
 }

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list