[ARVADOS] updated: c98ae6385a00eaaf4a095df092aea84f02d50ce8

git at public.curoverse.com git at public.curoverse.com
Fri May 23 15:52:27 EDT 2014


Summary of changes:
 apps/workbench/app/assets/javascripts/editable.js  |  4 ++
 .../app/assets/javascripts/pipeline_instances.js   | 79 +++++++++++-----------
 apps/workbench/app/assets/javascripts/selection.js |  2 +-
 .../controllers/pipeline_instances_controller.rb   |  6 +-
 apps/workbench/app/helpers/application_helper.rb   |  4 +-
 .../pipeline_instances/_show_components.html.erb   |  8 ++-
 .../_show_components_editable.html.erb             |  2 +-
 .../views/pipeline_instances/_show_inputs.html.erb | 50 ++++++++++++++
 .../views/pipeline_templates/_show_recent.html.erb | 39 +++++------
 .../test/integration/pipeline_instances_test.rb    | 56 +++++++++++++++
 apps/workbench/test/integration/smoke_test.rb      |  4 ++
 apps/workbench/test/integration_helper.rb          | 16 +++++
 services/api/test/fixtures/collections.yml         |  8 +--
 services/api/test/fixtures/pipeline_templates.yml  |  4 +-
 14 files changed, 209 insertions(+), 73 deletions(-)
 create mode 100644 apps/workbench/app/views/pipeline_instances/_show_inputs.html.erb
 create mode 100644 apps/workbench/test/integration/pipeline_instances_test.rb

       via  c98ae6385a00eaaf4a095df092aea84f02d50ce8 (commit)
       via  ca77ae5cb94c31232abc8923174280fd848ef408 (commit)
       via  43ea53c8df3d9ea64615cb05d0909d256b9ae38c (commit)
       via  d035296164d63af58bcbb75d81b7cf6fe53f077c (commit)
       via  e08308d3b0797264219a0a5e9967e8e0ec2e9202 (commit)
       via  5459127f0ce9d3bf6ee12a034d9a7df0f4f75a39 (commit)
       via  cbf78e13973654510c5cc33b6420667611ef5245 (commit)
       via  b28565c8aa08cbf70762fa69e49c5067fcb57e96 (commit)
       via  140944b37fac0f631e8366222b2cbc5d5373ba8a (commit)
       via  e35cb0f48bbead47ad0c628c280bfc2cc32035ac (commit)
       via  d61f27791aa739dfc93c2f953236fffd4f0fcf6c (commit)
       via  89096c06922b406f7157082de410e40960f5c73e (commit)
       via  9d6ce80869e187a7c5a574ea5a5272bb89dd81ce (commit)
       via  e2dc7f5b9068e23822391bd3cf987b6f14e83ec0 (commit)
       via  1797ee0dea063ff59361740ee512c84d086dacaf (commit)
      from  b646cec74484bf07a54f4be2de712f50dc387aa0 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit c98ae6385a00eaaf4a095df092aea84f02d50ce8
Merge: ca77ae5 b646cec
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Fri May 23 15:52:20 2014 -0400

    Merge branch '1885-keep-proxy' of git.curoverse.com:arvados into 1885-keep-proxy refs #1885
    
    # Please enter a commit message to explain why this merge is necessary,
    # especially if it merges an updated upstream into a topic branch.
    #
    # Lines starting with '#' will be ignored, and an empty message aborts
    # the commit.


commit ca77ae5cb94c31232abc8923174280fd848ef408
Merge: b28565c 43ea53c
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Fri May 23 15:37:57 2014 -0400

    Merge branch '1885-keep-proxy' closes #1885


commit 43ea53c8df3d9ea64615cb05d0909d256b9ae38c
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Fri May 23 14:08:53 2014 -0400

    1885: Added logging of invalid requests.  Added logging when the server list is
    update.  Improved KeepClient error reporting.

diff --git a/sdk/go/src/arvados.org/keepclient/keepclient.go b/sdk/go/src/arvados.org/keepclient/keepclient.go
index 5ba6257..8d26b32 100644
--- a/sdk/go/src/arvados.org/keepclient/keepclient.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient.go
@@ -23,6 +23,8 @@ const BLOCKSIZE = 64 * 1024 * 1024
 var BlockNotFound = errors.New("Block not found")
 var InsufficientReplicasError = errors.New("Could not write sufficient replicas")
 var OversizeBlockError = errors.New("Block too big")
+var MissingArvadosApiHost = errors.New("Missing required environment variable ARVADOS_API_HOST")
+var MissingArvadosApiToken = errors.New("Missing required environment variable ARVADOS_API_TOKEN")
 
 const X_Keep_Desired_Replicas = "X-Keep-Desired-Replicas"
 const X_Keep_Replicas_Stored = "X-Keep-Replicas-Stored"
@@ -58,6 +60,13 @@ func MakeKeepClient() (kc KeepClient, err error) {
 		Using_proxy: false,
 		External:    external}
 
+	if os.Getenv("ARVADOS_API_HOST") == "" {
+		return kc, MissingArvadosApiHost
+	}
+	if os.Getenv("ARVADOS_API_TOKEN") == "" {
+		return kc, MissingArvadosApiToken
+	}
+
 	err = (&kc).DiscoverKeepServers()
 
 	return kc, err
diff --git a/sdk/go/src/arvados.org/keepclient/support.go b/sdk/go/src/arvados.org/keepclient/support.go
index f924137..913f7c7 100644
--- a/sdk/go/src/arvados.org/keepclient/support.go
+++ b/sdk/go/src/arvados.org/keepclient/support.go
@@ -47,7 +47,7 @@ func (this *KeepClient) DiscoverKeepServers() error {
 		return err
 	}
 
-	if resp.StatusCode != 200 {
+	if resp.StatusCode != http.StatusOK {
 		// fall back on keep disks
 		if req, err = http.NewRequest("GET", fmt.Sprintf("https://%s/arvados/v1/keep_disks", this.ApiServer), nil); err != nil {
 			return err
@@ -56,6 +56,9 @@ func (this *KeepClient) DiscoverKeepServers() error {
 		if resp, err = this.Client.Do(req); err != nil {
 			return err
 		}
+		if resp.StatusCode != http.StatusOK {
+			return errors.New(resp.Status)
+		}
 	}
 
 	type svcList struct {
diff --git a/services/keep/src/arvados.org/keepproxy/keepproxy.go b/services/keep/src/arvados.org/keepproxy/keepproxy.go
index 9e0b2ff..2d7e276 100644
--- a/services/keep/src/arvados.org/keepproxy/keepproxy.go
+++ b/services/keep/src/arvados.org/keepproxy/keepproxy.go
@@ -65,7 +65,7 @@ func main() {
 
 	kc, err := keepclient.MakeKeepClient()
 	if err != nil {
-		log.Fatal(err)
+		log.Fatalf("Error setting up keep client %s", err.Error())
 	}
 
 	if pidfile != "" {
@@ -87,6 +87,8 @@ func main() {
 
 	go RefreshServicesList(&kc)
 
+	log.Printf("Arvados Keep proxy started listening on %v with server list %v", listener.Addr(), kc.ServiceRoots())
+
 	// Start listening for requests.
 	http.Serve(listener, MakeRESTRouter(!no_get, !no_put, &kc))
 }
@@ -101,7 +103,14 @@ type ApiTokenCache struct {
 func RefreshServicesList(kc *keepclient.KeepClient) {
 	for {
 		time.Sleep(300 * time.Second)
+		oldservices := kc.ServiceRoots()
 		kc.DiscoverKeepServers()
+		newservices := kc.ServiceRoots()
+		s1 := fmt.Sprint(oldservices)
+		s2 := fmt.Sprint(newservices)
+		if s1 != s2 {
+			log.Printf("Updated server list to %v", s2)
+		}
 	}
 }
 
@@ -167,7 +176,7 @@ func CheckAuthorizationHeader(kc keepclient.KeepClient, cache *ApiTokenCache, re
 
 	var usersreq *http.Request
 
-	if usersreq, err = http.NewRequest("GET", fmt.Sprintf("https://%s/arvados/v1/users/current", kc.ApiServer), nil); err != nil {
+	if usersreq, err = http.NewRequest("HEAD", fmt.Sprintf("https://%s/arvados/v1/users/current", kc.ApiServer), nil); err != nil {
 		// Can't construct the request
 		log.Printf("%s: CheckAuthorizationHeader error: %v", GetRemoteAddress(req), err)
 		return false
@@ -206,6 +215,8 @@ type PutBlockHandler struct {
 	*ApiTokenCache
 }
 
+type InvalidPathHandler struct{}
+
 // MakeRESTRouter
 //     Returns a mux.Router that passes GET and PUT requests to the
 //     appropriate handlers.
@@ -233,9 +244,16 @@ func MakeRESTRouter(
 		ph.Methods("PUT")
 	}
 
+	rest.NotFoundHandler = InvalidPathHandler{}
+
 	return rest
 }
 
+func (this InvalidPathHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+	log.Printf("%s: %s %s unroutable", GetRemoteAddress(req), req.Method, req.URL.Path)
+	http.Error(resp, "Bad request", http.StatusBadRequest)
+}
+
 func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
 
 	kc := *this.KeepClient

commit d035296164d63af58bcbb75d81b7cf6fe53f077c
Merge: e08308d 2099a0a
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Fri May 23 10:27:59 2014 -0400

    Merge branch 'master' into 1885-keep-proxy


commit e08308d3b0797264219a0a5e9967e8e0ec2e9202
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Fri May 23 10:26:31 2014 -0400

    1885: Fix bug in not returning from the handler on failed authorization.
    Improved logging (which uncovered the previous bug.)  Tweaked go.sh and tests.

diff --git a/sdk/go/go.sh b/sdk/go/go.sh
index 89f81fb..5553567 100755
--- a/sdk/go/go.sh
+++ b/sdk/go/go.sh
@@ -1,6 +1,11 @@
 #! /bin/sh
 
-rootdir=$(dirname $0)
+# Wraps the 'go' executable with some environment setup.  Sets GOPATH, creates
+# 'pkg' and 'bin' directories, automatically installs dependencies, then runs
+# the underlying 'go' executable with any command line parameters provided to
+# the script.
+
+rootdir=$(readlink -f $(dirname $0))
 GOPATH=$rootdir:$GOPATH
 export GOPATH
 
diff --git a/sdk/go/src/arvados.org/keepclient/keepclient.go b/sdk/go/src/arvados.org/keepclient/keepclient.go
index dadf8bf..5ba6257 100644
--- a/sdk/go/src/arvados.org/keepclient/keepclient.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient.go
@@ -24,6 +24,9 @@ var BlockNotFound = errors.New("Block not found")
 var InsufficientReplicasError = errors.New("Could not write sufficient replicas")
 var OversizeBlockError = errors.New("Block too big")
 
+const X_Keep_Desired_Replicas = "X-Keep-Desired-Replicas"
+const X_Keep_Replicas_Stored = "X-Keep-Replicas-Stored"
+
 // Information about Arvados and Keep servers.
 type KeepClient struct {
 	ApiServer     string
diff --git a/sdk/go/src/arvados.org/keepclient/keepclient_test.go b/sdk/go/src/arvados.org/keepclient/keepclient_test.go
index 395603d..291d8f8 100644
--- a/sdk/go/src/arvados.org/keepclient/keepclient_test.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient_test.go
@@ -44,8 +44,12 @@ func (s *ServerRequiredSuite) SetUpSuite(c *C) {
 		c.Skip("Skipping tests that require server")
 	} else {
 		os.Chdir(pythonDir())
-		exec.Command("python", "run_test_server.py", "start").Run()
-		exec.Command("python", "run_test_server.py", "start_keep").Run()
+		if err := exec.Command("python", "run_test_server.py", "start").Run(); err != nil {
+			panic("'python run_test_server.py start' returned error")
+		}
+		if err := exec.Command("python", "run_test_server.py", "start_keep").Run(); err != nil {
+			panic("'python run_test_server.py start_keep' returned error")
+		}
 	}
 }
 
diff --git a/sdk/go/src/arvados.org/keepclient/support.go b/sdk/go/src/arvados.org/keepclient/support.go
index 38669a1..f924137 100644
--- a/sdk/go/src/arvados.org/keepclient/support.go
+++ b/sdk/go/src/arvados.org/keepclient/support.go
@@ -156,7 +156,7 @@ type uploadStatus struct {
 func (this KeepClient) uploadToKeepServer(host string, hash string, body io.ReadCloser,
 	upload_status chan<- uploadStatus, expectedLength int64) {
 
-	log.Printf("Uploading to %s", host)
+	log.Printf("Uploading %s to %s", hash, host)
 
 	var req *http.Request
 	var err error
@@ -175,7 +175,7 @@ func (this KeepClient) uploadToKeepServer(host string, hash string, body io.Read
 	req.Header.Add("Content-Type", "application/octet-stream")
 
 	if this.Using_proxy {
-		req.Header.Add("X-Keep-Desired-Replicas", fmt.Sprint(this.Want_replicas))
+		req.Header.Add(X_Keep_Desired_Replicas, fmt.Sprint(this.Want_replicas))
 	}
 
 	req.Body = body
@@ -188,7 +188,7 @@ func (this KeepClient) uploadToKeepServer(host string, hash string, body io.Read
 	}
 
 	rep := 1
-	if xr := resp.Header.Get("X-Keep-Replicas-Stored"); xr != "" {
+	if xr := resp.Header.Get(X_Keep_Replicas_Stored); xr != "" {
 		fmt.Sscanf(xr, "%d", &rep)
 	}
 
@@ -248,7 +248,7 @@ func (this KeepClient) putReplicas(
 				status.url, status.err)
 		}
 		active -= 1
-		log.Printf("Upload status code: %v remaining replicas: %v active: %v", status.statusCode, remaining_replicas, active)
+		log.Printf("Upload to %v status code: %v remaining replicas: %v active: %v", status.url, status.statusCode, remaining_replicas, active)
 	}
 
 	return this.Want_replicas, nil
diff --git a/services/keep/go.sh b/services/keep/go.sh
index 177b27d..156fe90 100755
--- a/services/keep/go.sh
+++ b/services/keep/go.sh
@@ -1,6 +1,11 @@
 #! /bin/sh
 
-rootdir=$(dirname $0)
+# Wraps the 'go' executable with some environment setup.  Sets GOPATH, creates
+# 'pkg' and 'bin' directories, automatically installs dependencies, then runs
+# the underlying 'go' executable with any command line parameters provided to
+# the script.
+
+rootdir=$(readlink -f $(dirname $0))
 GOPATH=$rootdir:$rootdir/../../sdk/go:$GOPATH
 export GOPATH
 
diff --git a/services/keep/src/arvados.org/keepproxy/keepproxy.go b/services/keep/src/arvados.org/keepproxy/keepproxy.go
index b914f47..9e0b2ff 100644
--- a/services/keep/src/arvados.org/keepproxy/keepproxy.go
+++ b/services/keep/src/arvados.org/keepproxy/keepproxy.go
@@ -63,15 +63,9 @@ func main() {
 
 	flag.Parse()
 
-	/*if no_get == false {
-		log.Print("Must specify -no-get")
-		return
-	}*/
-
 	kc, err := keepclient.MakeKeepClient()
 	if err != nil {
-		log.Print(err)
-		return
+		log.Fatal(err)
 	}
 
 	if pidfile != "" {
@@ -88,8 +82,7 @@ func main() {
 
 	listener, err = net.Listen("tcp", listen)
 	if err != nil {
-		log.Printf("Could not listen on %v", listen)
-		return
+		log.Fatalf("Could not listen on %v", listen)
 	}
 
 	go RefreshServicesList(&kc)
@@ -143,13 +136,25 @@ func (this *ApiTokenCache) RecallToken(token string) bool {
 	}
 }
 
+func GetRemoteAddress(req *http.Request) string {
+	if realip := req.Header.Get("X-Real-IP"); realip != "" {
+		if forwarded := req.Header.Get("X-Forwarded-For"); forwarded != realip {
+			return fmt.Sprintf("%s (X-Forwarded-For %s)", realip, forwarded)
+		} else {
+			return realip
+		}
+	}
+	return req.RemoteAddr
+}
+
 func CheckAuthorizationHeader(kc keepclient.KeepClient, cache *ApiTokenCache, req *http.Request) bool {
-	if req.Header.Get("Authorization") == "" {
+	var auth string
+	if auth = req.Header.Get("Authorization"); auth == "" {
 		return false
 	}
 
 	var tok string
-	_, err := fmt.Sscanf(req.Header.Get("Authorization"), "OAuth2 %s", &tok)
+	_, err := fmt.Sscanf(auth, "OAuth2 %s", &tok)
 	if err != nil {
 		// Scanning error
 		return false
@@ -164,7 +169,7 @@ func CheckAuthorizationHeader(kc keepclient.KeepClient, cache *ApiTokenCache, re
 
 	if usersreq, err = http.NewRequest("GET", fmt.Sprintf("https://%s/arvados/v1/users/current", kc.ApiServer), nil); err != nil {
 		// Can't construct the request
-		log.Print("CheckAuthorizationHeader error: %v", err)
+		log.Printf("%s: CheckAuthorizationHeader error: %v", GetRemoteAddress(req), err)
 		return false
 	}
 
@@ -175,12 +180,13 @@ func CheckAuthorizationHeader(kc keepclient.KeepClient, cache *ApiTokenCache, re
 	var resp *http.Response
 	if resp, err = kc.Client.Do(usersreq); err != nil {
 		// Something else failed
-		log.Print("CheckAuthorizationHeader error: %v", err)
+		log.Printf("%s: CheckAuthorizationHeader error connecting to API server: %v", GetRemoteAddress(req), err.Error())
 		return false
 	}
 
 	if resp.StatusCode != http.StatusOK {
 		// Bad status
+		log.Printf("%s: CheckAuthorizationHeader API server responded: %v", GetRemoteAddress(req), resp.Status)
 		return false
 	}
 
@@ -234,14 +240,17 @@ func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
 
 	kc := *this.KeepClient
 
-	if !CheckAuthorizationHeader(kc, this.ApiTokenCache, req) {
-		http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
-	}
-
 	hash := mux.Vars(req)["hash"]
 	signature := mux.Vars(req)["signature"]
 	timestamp := mux.Vars(req)["timestamp"]
 
+	log.Printf("%s: %s %s", GetRemoteAddress(req), req.Method, hash)
+
+	if !CheckAuthorizationHeader(kc, this.ApiTokenCache, req) {
+		http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
+		return
+	}
+
 	var reader io.ReadCloser
 	var err error
 	var blocklen int64
@@ -258,25 +267,32 @@ func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
 	switch err {
 	case nil:
 		if reader != nil {
-			io.Copy(resp, reader)
+			n, err2 := io.Copy(resp, reader)
+			if n != blocklen {
+				log.Printf("%s: %s %s mismatched return %v with Content-Length %v error", GetRemoteAddress(req), req.Method, hash, n, blocklen, err.Error())
+			} else if err2 == nil {
+				log.Printf("%s: %s %s success returned %v bytes", GetRemoteAddress(req), req.Method, hash, n)
+			} else {
+				log.Printf("%s: %s %s returned %v bytes error %v", GetRemoteAddress(req), req.Method, hash, n, err.Error())
+			}
+		} else {
+			log.Printf("%s: %s %s success", GetRemoteAddress(req), req.Method, hash)
 		}
 	case keepclient.BlockNotFound:
 		http.Error(resp, "Not found", http.StatusNotFound)
 	default:
 		http.Error(resp, err.Error(), http.StatusBadGateway)
 	}
+
+	if err != nil {
+		log.Printf("%s: %s %s error %s", GetRemoteAddress(req), req.Method, hash, err.Error())
+	}
 }
 
 func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
 
-	log.Print("PutBlockHandler start")
-
 	kc := *this.KeepClient
 
-	if !CheckAuthorizationHeader(kc, this.ApiTokenCache, req) {
-		http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
-	}
-
 	hash := mux.Vars(req)["hash"]
 
 	var contentLength int64 = -1
@@ -288,15 +304,22 @@ func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
 
 	}
 
+	log.Printf("%s: %s %s Content-Length %v", GetRemoteAddress(req), req.Method, hash, contentLength)
+
 	if contentLength < 1 {
 		http.Error(resp, "Must include Content-Length header", http.StatusLengthRequired)
 		return
 	}
 
+	if !CheckAuthorizationHeader(kc, this.ApiTokenCache, req) {
+		http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
+		return
+	}
+
 	// Check if the client specified the number of replicas
 	if req.Header.Get("X-Keep-Desired-Replicas") != "" {
 		var r int
-		_, err := fmt.Sscanf(req.Header.Get("X-Keep-Desired-Replicas"), "%d", &r)
+		_, err := fmt.Sscanf(req.Header.Get(keepclient.X_Keep_Desired_Replicas), "%d", &r)
 		if err != nil {
 			kc.Want_replicas = r
 		}
@@ -305,14 +328,13 @@ func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
 	// Now try to put the block through
 	replicas, err := kc.PutHR(hash, req.Body, contentLength)
 
-	log.Printf("Replicas stored: %v err: %v", replicas, err)
-
 	// Tell the client how many successful PUTs we accomplished
-	resp.Header().Set("X-Keep-Replicas-Stored", fmt.Sprintf("%d", replicas))
+	resp.Header().Set(keepclient.X_Keep_Replicas_Stored, fmt.Sprintf("%d", replicas))
 
 	switch err {
 	case nil:
 		// Default will return http.StatusOK
+		log.Printf("%s: %s %s finished, stored %v replicas (desired %v)", GetRemoteAddress(req), req.Method, hash, replicas, kc.Want_replicas)
 
 	case keepclient.OversizeBlockError:
 		// Too much data
@@ -332,4 +354,8 @@ func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
 		http.Error(resp, err.Error(), http.StatusBadGateway)
 	}
 
+	if err != nil {
+		log.Printf("%s: %s %s stored %v replicas (desired %v) got error %v", GetRemoteAddress(req), req.Method, hash, replicas, kc.Want_replicas, err.Error())
+	}
+
 }
diff --git a/services/keep/src/arvados.org/keepproxy/keepproxy_test.go b/services/keep/src/arvados.org/keepproxy/keepproxy_test.go
index d8abda7..f03c94f 100644
--- a/services/keep/src/arvados.org/keepproxy/keepproxy_test.go
+++ b/services/keep/src/arvados.org/keepproxy/keepproxy_test.go
@@ -39,8 +39,13 @@ func (s *ServerRequiredSuite) SetUpSuite(c *C) {
 	defer os.Chdir(cwd)
 
 	os.Chdir(pythonDir())
-	exec.Command("python", "run_test_server.py", "start").Run()
-	exec.Command("python", "run_test_server.py", "start_keep").Run()
+
+	if err := exec.Command("python", "run_test_server.py", "start").Run(); err != nil {
+		panic("'python run_test_server.py start' returned error")
+	}
+	if err := exec.Command("python", "run_test_server.py", "start_keep").Run(); err != nil {
+		panic("'python run_test_server.py start_keep' returned error")
+	}
 
 	os.Setenv("ARVADOS_API_HOST", "localhost:3001")
 	os.Setenv("ARVADOS_API_TOKEN", "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")

commit 5459127f0ce9d3bf6ee12a034d9a7df0f4f75a39
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Thu May 22 14:51:56 2014 -0400

    1885: Made ServiceRoots atomically updatable, so that KeepProxy can poll for
    changes in the services list without disrupting any active flows.

diff --git a/sdk/go/src/arvados.org/keepclient/keepclient.go b/sdk/go/src/arvados.org/keepclient/keepclient.go
index 91989bd..dadf8bf 100644
--- a/sdk/go/src/arvados.org/keepclient/keepclient.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient.go
@@ -11,6 +11,10 @@ import (
 	"io/ioutil"
 	"net/http"
 	"os"
+	"sort"
+	"sync"
+	"sync/atomic"
+	"unsafe"
 )
 
 // A Keep "block" is 64MB.
@@ -25,11 +29,12 @@ type KeepClient struct {
 	ApiServer     string
 	ApiToken      string
 	ApiInsecure   bool
-	Service_roots []string
 	Want_replicas int
 	Client        *http.Client
 	Using_proxy   bool
 	External      bool
+	service_roots *[]string
+	lock          sync.Mutex
 }
 
 // Create a new KeepClient, initialized with standard Arvados environment
@@ -50,7 +55,7 @@ func MakeKeepClient() (kc KeepClient, err error) {
 		Using_proxy: false,
 		External:    external}
 
-	err = (&kc).discoverKeepServers()
+	err = (&kc).DiscoverKeepServers()
 
 	return kc, err
 }
@@ -207,3 +212,22 @@ func (this KeepClient) AuthorizedAsk(hash string, signature string,
 	return 0, "", BlockNotFound
 
 }
+
+// Atomically read the service_roots field.
+func (this *KeepClient) ServiceRoots() []string {
+	r := (*[]string)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots))))
+	return *r
+}
+
+// Atomically update the service_roots field.  Enables you to update
+// service_roots without disrupting any GET or PUT operations that might
+// already be in progress.
+func (this *KeepClient) SetServiceRoots(svc []string) {
+	// Must be sorted for ShuffledServiceRoots() to produce consistent
+	// results.
+	roots := make([]string, len(svc))
+	copy(roots, svc)
+	sort.Strings(roots)
+	atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots)),
+		unsafe.Pointer(&roots))
+}
diff --git a/sdk/go/src/arvados.org/keepclient/keepclient_test.go b/sdk/go/src/arvados.org/keepclient/keepclient_test.go
index 600d739..395603d 100644
--- a/sdk/go/src/arvados.org/keepclient/keepclient_test.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient_test.go
@@ -13,7 +13,6 @@ import (
 	"net/http"
 	"os"
 	"os/exec"
-	"sort"
 	"strings"
 	"testing"
 )
@@ -75,13 +74,14 @@ func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
 	c.Check(kc.Client.Transport.(*http.Transport).TLSClientConfig.InsecureSkipVerify, Equals, true)
 
 	c.Assert(err, Equals, nil)
-	c.Check(len(kc.Service_roots), Equals, 2)
-	c.Check(kc.Service_roots[0], Equals, "http://localhost:25107")
-	c.Check(kc.Service_roots[1], Equals, "http://localhost:25108")
+	c.Check(len(kc.ServiceRoots()), Equals, 2)
+	c.Check(kc.ServiceRoots()[0], Equals, "http://localhost:25107")
+	c.Check(kc.ServiceRoots()[1], Equals, "http://localhost:25108")
 }
 
 func (s *StandaloneSuite) TestShuffleServiceRoots(c *C) {
-	kc := KeepClient{Service_roots: []string{"http://localhost:25107", "http://localhost:25108", "http://localhost:25109", "http://localhost:25110", "http://localhost:25111", "http://localhost:25112", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25116", "http://localhost:25117", "http://localhost:25118", "http://localhost:25119", "http://localhost:25120", "http://localhost:25121", "http://localhost:25122", "http://localhost:25123"}}
+	kc := KeepClient{}
+	kc.SetServiceRoots([]string{"http://localhost:25107", "http://localhost:25108", "http://localhost:25109", "http://localhost:25110", "http://localhost:25111", "http://localhost:25112", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25116", "http://localhost:25117", "http://localhost:25118", "http://localhost:25119", "http://localhost:25120", "http://localhost:25121", "http://localhost:25122", "http://localhost:25123"})
 
 	// "foo" acbd18db4cc2f85cedef654fccc4a4d8
 	foo_shuffle := []string{"http://localhost:25116", "http://localhost:25120", "http://localhost:25119", "http://localhost:25122", "http://localhost:25108", "http://localhost:25114", "http://localhost:25112", "http://localhost:25107", "http://localhost:25118", "http://localhost:25111", "http://localhost:25113", "http://localhost:25121", "http://localhost:25110", "http://localhost:25117", "http://localhost:25109", "http://localhost:25115", "http://localhost:25123"}
@@ -265,16 +265,16 @@ func (s *StandaloneSuite) TestPutB(c *C) {
 
 	kc.Want_replicas = 2
 	kc.ApiToken = "abc123"
-	kc.Service_roots = make([]string, 5)
+	service_roots := make([]string, 5)
 
 	ks := RunSomeFakeKeepServers(st, 5, 2990)
 
 	for i := 0; i < len(ks); i += 1 {
-		kc.Service_roots[i] = ks[i].url
+		service_roots[i] = ks[i].url
 		defer ks[i].listener.Close()
 	}
 
-	sort.Strings(kc.Service_roots)
+	kc.SetServiceRoots(service_roots)
 
 	kc.PutB([]byte("foo"))
 
@@ -306,16 +306,16 @@ func (s *StandaloneSuite) TestPutHR(c *C) {
 
 	kc.Want_replicas = 2
 	kc.ApiToken = "abc123"
-	kc.Service_roots = make([]string, 5)
+	service_roots := make([]string, 5)
 
 	ks := RunSomeFakeKeepServers(st, 5, 2990)
 
 	for i := 0; i < len(ks); i += 1 {
-		kc.Service_roots[i] = ks[i].url
+		service_roots[i] = ks[i].url
 		defer ks[i].listener.Close()
 	}
 
-	sort.Strings(kc.Service_roots)
+	kc.SetServiceRoots(service_roots)
 
 	reader, writer := io.Pipe()
 
@@ -359,21 +359,21 @@ func (s *StandaloneSuite) TestPutWithFail(c *C) {
 
 	kc.Want_replicas = 2
 	kc.ApiToken = "abc123"
-	kc.Service_roots = make([]string, 5)
+	service_roots := make([]string, 5)
 
 	ks1 := RunSomeFakeKeepServers(st, 4, 2990)
 	ks2 := RunSomeFakeKeepServers(fh, 1, 2995)
 
 	for i, k := range ks1 {
-		kc.Service_roots[i] = k.url
+		service_roots[i] = k.url
 		defer k.listener.Close()
 	}
 	for i, k := range ks2 {
-		kc.Service_roots[len(ks1)+i] = k.url
+		service_roots[len(ks1)+i] = k.url
 		defer k.listener.Close()
 	}
 
-	sort.Strings(kc.Service_roots)
+	kc.SetServiceRoots(service_roots)
 
 	shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
 
@@ -407,21 +407,21 @@ func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
 
 	kc.Want_replicas = 2
 	kc.ApiToken = "abc123"
-	kc.Service_roots = make([]string, 5)
+	service_roots := make([]string, 5)
 
 	ks1 := RunSomeFakeKeepServers(st, 1, 2990)
 	ks2 := RunSomeFakeKeepServers(fh, 4, 2991)
 
 	for i, k := range ks1 {
-		kc.Service_roots[i] = k.url
+		service_roots[i] = k.url
 		defer k.listener.Close()
 	}
 	for i, k := range ks2 {
-		kc.Service_roots[len(ks1)+i] = k.url
+		service_roots[len(ks1)+i] = k.url
 		defer k.listener.Close()
 	}
 
-	sort.Strings(kc.Service_roots)
+	kc.SetServiceRoots(service_roots)
 
 	shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
 
@@ -464,7 +464,7 @@ func (s *StandaloneSuite) TestGet(c *C) {
 
 	kc, _ := MakeKeepClient()
 	kc.ApiToken = "abc123"
-	kc.Service_roots = []string{url}
+	kc.SetServiceRoots([]string{url})
 
 	r, n, url2, err := kc.Get(hash)
 	defer r.Close()
@@ -489,7 +489,7 @@ func (s *StandaloneSuite) TestGetFail(c *C) {
 
 	kc, _ := MakeKeepClient()
 	kc.ApiToken = "abc123"
-	kc.Service_roots = []string{url}
+	kc.SetServiceRoots([]string{url})
 
 	r, n, url2, err := kc.Get(hash)
 	c.Check(err, Equals, BlockNotFound)
@@ -518,7 +518,7 @@ func (s *StandaloneSuite) TestChecksum(c *C) {
 
 	kc, _ := MakeKeepClient()
 	kc.ApiToken = "abc123"
-	kc.Service_roots = []string{url}
+	kc.SetServiceRoots([]string{url})
 
 	r, n, _, err := kc.Get(barhash)
 	_, err = ioutil.ReadAll(r)
@@ -550,21 +550,21 @@ func (s *StandaloneSuite) TestGetWithFailures(c *C) {
 
 	kc, _ := MakeKeepClient()
 	kc.ApiToken = "abc123"
-	kc.Service_roots = make([]string, 5)
+	service_roots := make([]string, 5)
 
 	ks1 := RunSomeFakeKeepServers(st, 1, 2990)
 	ks2 := RunSomeFakeKeepServers(fh, 4, 2991)
 
 	for i, k := range ks1 {
-		kc.Service_roots[i] = k.url
+		service_roots[i] = k.url
 		defer k.listener.Close()
 	}
 	for i, k := range ks2 {
-		kc.Service_roots[len(ks1)+i] = k.url
+		service_roots[len(ks1)+i] = k.url
 		defer k.listener.Close()
 	}
 
-	sort.Strings(kc.Service_roots)
+	kc.SetServiceRoots(service_roots)
 
 	r, n, url2, err := kc.Get(hash)
 	<-fh.handled
@@ -635,15 +635,17 @@ func (s *StandaloneSuite) TestPutProxy(c *C) {
 	kc.Want_replicas = 2
 	kc.Using_proxy = true
 	kc.ApiToken = "abc123"
-	kc.Service_roots = make([]string, 1)
+	service_roots := make([]string, 1)
 
 	ks1 := RunSomeFakeKeepServers(st, 1, 2990)
 
 	for i, k := range ks1 {
-		kc.Service_roots[i] = k.url
+		service_roots[i] = k.url
 		defer k.listener.Close()
 	}
 
+	kc.SetServiceRoots(service_roots)
+
 	_, replicas, err := kc.PutB([]byte("foo"))
 	<-st.handled
 
@@ -663,14 +665,15 @@ func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) {
 	kc.Want_replicas = 3
 	kc.Using_proxy = true
 	kc.ApiToken = "abc123"
-	kc.Service_roots = make([]string, 1)
+	service_roots := make([]string, 1)
 
 	ks1 := RunSomeFakeKeepServers(st, 1, 2990)
 
 	for i, k := range ks1 {
-		kc.Service_roots[i] = k.url
+		service_roots[i] = k.url
 		defer k.listener.Close()
 	}
+	kc.SetServiceRoots(service_roots)
 
 	_, replicas, err := kc.PutB([]byte("foo"))
 	<-st.handled
diff --git a/sdk/go/src/arvados.org/keepclient/support.go b/sdk/go/src/arvados.org/keepclient/support.go
index ef4a8e1..38669a1 100644
--- a/sdk/go/src/arvados.org/keepclient/support.go
+++ b/sdk/go/src/arvados.org/keepclient/support.go
@@ -10,7 +10,6 @@ import (
 	"log"
 	"net/http"
 	"os"
-	"sort"
 	"strconv"
 )
 
@@ -21,10 +20,9 @@ type keepDisk struct {
 	SvcType  string `json:"service_type"`
 }
 
-func (this *KeepClient) discoverKeepServers() error {
+func (this *KeepClient) DiscoverKeepServers() error {
 	if prx := os.Getenv("ARVADOS_KEEP_PROXY"); prx != "" {
-		this.Service_roots = make([]string, 1)
-		this.Service_roots[0] = prx
+		this.SetServiceRoots([]string{prx})
 		this.Using_proxy = true
 		return nil
 	}
@@ -72,7 +70,7 @@ func (this *KeepClient) discoverKeepServers() error {
 	}
 
 	listed := make(map[string]bool)
-	this.Service_roots = make([]string, 0, len(m.Items))
+	service_roots := make([]string, 0, len(m.Items))
 
 	for _, element := range m.Items {
 		n := ""
@@ -87,16 +85,14 @@ func (this *KeepClient) discoverKeepServers() error {
 		// Skip duplicates
 		if !listed[url] {
 			listed[url] = true
-			this.Service_roots = append(this.Service_roots, url)
+			service_roots = append(service_roots, url)
 		}
 		if element.SvcType == "proxy" {
 			this.Using_proxy = true
 		}
 	}
 
-	// Must be sorted for ShuffledServiceRoots() to produce consistent
-	// results.
-	sort.Strings(this.Service_roots)
+	this.SetServiceRoots(service_roots)
 
 	return nil
 }
@@ -111,11 +107,12 @@ func (this KeepClient) shuffledServiceRoots(hash string) (pseq []string) {
 	seed := hash
 
 	// Keep servers still to be added to the ordering
-	pool := make([]string, len(this.Service_roots))
-	copy(pool, this.Service_roots)
+	service_roots := this.ServiceRoots()
+	pool := make([]string, len(service_roots))
+	copy(pool, service_roots)
 
 	// output probe sequence
-	pseq = make([]string, 0, len(this.Service_roots))
+	pseq = make([]string, 0, len(service_roots))
 
 	// iterate while there are servers left to be assigned
 	for len(pool) > 0 {
diff --git a/services/keep/src/arvados.org/keepproxy/keepproxy.go b/services/keep/src/arvados.org/keepproxy/keepproxy.go
index ed33ac9..b914f47 100644
--- a/services/keep/src/arvados.org/keepproxy/keepproxy.go
+++ b/services/keep/src/arvados.org/keepproxy/keepproxy.go
@@ -92,8 +92,10 @@ func main() {
 		return
 	}
 
+	go RefreshServicesList(&kc)
+
 	// Start listening for requests.
-	http.Serve(listener, MakeRESTRouter(!no_get, !no_put, kc))
+	http.Serve(listener, MakeRESTRouter(!no_get, !no_put, &kc))
 }
 
 type ApiTokenCache struct {
@@ -102,6 +104,14 @@ type ApiTokenCache struct {
 	expireTime int64
 }
 
+// Refresh the keep service list every five minutes.
+func RefreshServicesList(kc *keepclient.KeepClient) {
+	for {
+		time.Sleep(300 * time.Second)
+		kc.DiscoverKeepServers()
+	}
+}
+
 // Cache the token and set an expire time.  If we already have an expire time
 // on the token, it is not updated.
 func (this *ApiTokenCache) RememberToken(token string) {
@@ -181,12 +191,12 @@ func CheckAuthorizationHeader(kc keepclient.KeepClient, cache *ApiTokenCache, re
 }
 
 type GetBlockHandler struct {
-	keepclient.KeepClient
+	*keepclient.KeepClient
 	*ApiTokenCache
 }
 
 type PutBlockHandler struct {
-	keepclient.KeepClient
+	*keepclient.KeepClient
 	*ApiTokenCache
 }
 
@@ -197,7 +207,7 @@ type PutBlockHandler struct {
 func MakeRESTRouter(
 	enable_get bool,
 	enable_put bool,
-	kc keepclient.KeepClient) *mux.Router {
+	kc *keepclient.KeepClient) *mux.Router {
 
 	t := &ApiTokenCache{tokens: make(map[string]int64), expireTime: 300}
 
@@ -222,7 +232,9 @@ func MakeRESTRouter(
 
 func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
 
-	if !CheckAuthorizationHeader(this.KeepClient, this.ApiTokenCache, req) {
+	kc := *this.KeepClient
+
+	if !CheckAuthorizationHeader(kc, this.ApiTokenCache, req) {
 		http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
 	}
 
@@ -235,10 +247,10 @@ func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
 	var blocklen int64
 
 	if req.Method == "GET" {
-		reader, blocklen, _, err = this.KeepClient.AuthorizedGet(hash, signature, timestamp)
+		reader, blocklen, _, err = kc.AuthorizedGet(hash, signature, timestamp)
 		defer reader.Close()
 	} else if req.Method == "HEAD" {
-		blocklen, _, err = this.KeepClient.AuthorizedAsk(hash, signature, timestamp)
+		blocklen, _, err = kc.AuthorizedAsk(hash, signature, timestamp)
 	}
 
 	resp.Header().Set("Content-Length", fmt.Sprint(blocklen))
@@ -259,7 +271,9 @@ func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
 
 	log.Print("PutBlockHandler start")
 
-	if !CheckAuthorizationHeader(this.KeepClient, this.ApiTokenCache, req) {
+	kc := *this.KeepClient
+
+	if !CheckAuthorizationHeader(kc, this.ApiTokenCache, req) {
 		http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
 	}
 
@@ -284,12 +298,12 @@ func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
 		var r int
 		_, err := fmt.Sscanf(req.Header.Get("X-Keep-Desired-Replicas"), "%d", &r)
 		if err != nil {
-			this.KeepClient.Want_replicas = r
+			kc.Want_replicas = r
 		}
 	}
 
 	// Now try to put the block through
-	replicas, err := this.KeepClient.PutHR(hash, req.Body, contentLength)
+	replicas, err := kc.PutHR(hash, req.Body, contentLength)
 
 	log.Printf("Replicas stored: %v err: %v", replicas, err)
 
diff --git a/services/keep/src/arvados.org/keepproxy/keepproxy_test.go b/services/keep/src/arvados.org/keepproxy/keepproxy_test.go
index af1377b..d8abda7 100644
--- a/services/keep/src/arvados.org/keepproxy/keepproxy_test.go
+++ b/services/keep/src/arvados.org/keepproxy/keepproxy_test.go
@@ -109,8 +109,8 @@ func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
 	kc, err := keepclient.MakeKeepClient()
 	c.Check(kc.External, Equals, true)
 	c.Check(kc.Using_proxy, Equals, true)
-	c.Check(len(kc.Service_roots), Equals, 1)
-	c.Check(kc.Service_roots[0], Equals, "http://localhost:29950")
+	c.Check(len(kc.ServiceRoots()), Equals, 1)
+	c.Check(kc.ServiceRoots()[0], Equals, "http://localhost:29950")
 	c.Check(err, Equals, nil)
 	os.Setenv("ARVADOS_EXTERNAL_CLIENT", "")
 
@@ -159,8 +159,8 @@ func (s *ServerRequiredSuite) TestPutAskGetForbidden(c *C) {
 	kc.ApiToken = "123xyz"
 	c.Check(kc.External, Equals, true)
 	c.Check(kc.Using_proxy, Equals, true)
-	c.Check(len(kc.Service_roots), Equals, 1)
-	c.Check(kc.Service_roots[0], Equals, "http://localhost:29950")
+	c.Check(len(kc.ServiceRoots()), Equals, 1)
+	c.Check(kc.ServiceRoots()[0], Equals, "http://localhost:29950")
 	c.Check(err, Equals, nil)
 	os.Setenv("ARVADOS_EXTERNAL_CLIENT", "")
 

commit cbf78e13973654510c5cc33b6420667611ef5245
Merge: be92cf1 d582312
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Thu May 22 13:38:10 2014 -0400

    Merge branch 'master' into 1885-keep-proxy refs #1885


-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list