[ARVADOS] updated: c98ae6385a00eaaf4a095df092aea84f02d50ce8
git at public.curoverse.com
git at public.curoverse.com
Fri May 23 15:52:27 EDT 2014
Summary of changes:
apps/workbench/app/assets/javascripts/editable.js | 4 ++
.../app/assets/javascripts/pipeline_instances.js | 79 +++++++++++-----------
apps/workbench/app/assets/javascripts/selection.js | 2 +-
.../controllers/pipeline_instances_controller.rb | 6 +-
apps/workbench/app/helpers/application_helper.rb | 4 +-
.../pipeline_instances/_show_components.html.erb | 8 ++-
.../_show_components_editable.html.erb | 2 +-
.../views/pipeline_instances/_show_inputs.html.erb | 50 ++++++++++++++
.../views/pipeline_templates/_show_recent.html.erb | 39 +++++------
.../test/integration/pipeline_instances_test.rb | 56 +++++++++++++++
apps/workbench/test/integration/smoke_test.rb | 4 ++
apps/workbench/test/integration_helper.rb | 16 +++++
services/api/test/fixtures/collections.yml | 8 +--
services/api/test/fixtures/pipeline_templates.yml | 4 +-
14 files changed, 209 insertions(+), 73 deletions(-)
create mode 100644 apps/workbench/app/views/pipeline_instances/_show_inputs.html.erb
create mode 100644 apps/workbench/test/integration/pipeline_instances_test.rb
via c98ae6385a00eaaf4a095df092aea84f02d50ce8 (commit)
via ca77ae5cb94c31232abc8923174280fd848ef408 (commit)
via 43ea53c8df3d9ea64615cb05d0909d256b9ae38c (commit)
via d035296164d63af58bcbb75d81b7cf6fe53f077c (commit)
via e08308d3b0797264219a0a5e9967e8e0ec2e9202 (commit)
via 5459127f0ce9d3bf6ee12a034d9a7df0f4f75a39 (commit)
via cbf78e13973654510c5cc33b6420667611ef5245 (commit)
via b28565c8aa08cbf70762fa69e49c5067fcb57e96 (commit)
via 140944b37fac0f631e8366222b2cbc5d5373ba8a (commit)
via e35cb0f48bbead47ad0c628c280bfc2cc32035ac (commit)
via d61f27791aa739dfc93c2f953236fffd4f0fcf6c (commit)
via 89096c06922b406f7157082de410e40960f5c73e (commit)
via 9d6ce80869e187a7c5a574ea5a5272bb89dd81ce (commit)
via e2dc7f5b9068e23822391bd3cf987b6f14e83ec0 (commit)
via 1797ee0dea063ff59361740ee512c84d086dacaf (commit)
from b646cec74484bf07a54f4be2de712f50dc387aa0 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit c98ae6385a00eaaf4a095df092aea84f02d50ce8
Merge: ca77ae5 b646cec
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Fri May 23 15:52:20 2014 -0400
Merge branch '1885-keep-proxy' of git.curoverse.com:arvados into 1885-keep-proxy refs #1885
# Please enter a commit message to explain why this merge is necessary,
# especially if it merges an updated upstream into a topic branch.
#
# Lines starting with '#' will be ignored, and an empty message aborts
# the commit.
commit ca77ae5cb94c31232abc8923174280fd848ef408
Merge: b28565c 43ea53c
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Fri May 23 15:37:57 2014 -0400
Merge branch '1885-keep-proxy' closes #1885
commit 43ea53c8df3d9ea64615cb05d0909d256b9ae38c
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Fri May 23 14:08:53 2014 -0400
1885: Added logging of invalid requests. Added logging when the server list is
update. Improved KeepClient error reporting.
diff --git a/sdk/go/src/arvados.org/keepclient/keepclient.go b/sdk/go/src/arvados.org/keepclient/keepclient.go
index 5ba6257..8d26b32 100644
--- a/sdk/go/src/arvados.org/keepclient/keepclient.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient.go
@@ -23,6 +23,8 @@ const BLOCKSIZE = 64 * 1024 * 1024
var BlockNotFound = errors.New("Block not found")
var InsufficientReplicasError = errors.New("Could not write sufficient replicas")
var OversizeBlockError = errors.New("Block too big")
+var MissingArvadosApiHost = errors.New("Missing required environment variable ARVADOS_API_HOST")
+var MissingArvadosApiToken = errors.New("Missing required environment variable ARVADOS_API_TOKEN")
const X_Keep_Desired_Replicas = "X-Keep-Desired-Replicas"
const X_Keep_Replicas_Stored = "X-Keep-Replicas-Stored"
@@ -58,6 +60,13 @@ func MakeKeepClient() (kc KeepClient, err error) {
Using_proxy: false,
External: external}
+ if os.Getenv("ARVADOS_API_HOST") == "" {
+ return kc, MissingArvadosApiHost
+ }
+ if os.Getenv("ARVADOS_API_TOKEN") == "" {
+ return kc, MissingArvadosApiToken
+ }
+
err = (&kc).DiscoverKeepServers()
return kc, err
diff --git a/sdk/go/src/arvados.org/keepclient/support.go b/sdk/go/src/arvados.org/keepclient/support.go
index f924137..913f7c7 100644
--- a/sdk/go/src/arvados.org/keepclient/support.go
+++ b/sdk/go/src/arvados.org/keepclient/support.go
@@ -47,7 +47,7 @@ func (this *KeepClient) DiscoverKeepServers() error {
return err
}
- if resp.StatusCode != 200 {
+ if resp.StatusCode != http.StatusOK {
// fall back on keep disks
if req, err = http.NewRequest("GET", fmt.Sprintf("https://%s/arvados/v1/keep_disks", this.ApiServer), nil); err != nil {
return err
@@ -56,6 +56,9 @@ func (this *KeepClient) DiscoverKeepServers() error {
if resp, err = this.Client.Do(req); err != nil {
return err
}
+ if resp.StatusCode != http.StatusOK {
+ return errors.New(resp.Status)
+ }
}
type svcList struct {
diff --git a/services/keep/src/arvados.org/keepproxy/keepproxy.go b/services/keep/src/arvados.org/keepproxy/keepproxy.go
index 9e0b2ff..2d7e276 100644
--- a/services/keep/src/arvados.org/keepproxy/keepproxy.go
+++ b/services/keep/src/arvados.org/keepproxy/keepproxy.go
@@ -65,7 +65,7 @@ func main() {
kc, err := keepclient.MakeKeepClient()
if err != nil {
- log.Fatal(err)
+ log.Fatalf("Error setting up keep client %s", err.Error())
}
if pidfile != "" {
@@ -87,6 +87,8 @@ func main() {
go RefreshServicesList(&kc)
+ log.Printf("Arvados Keep proxy started listening on %v with server list %v", listener.Addr(), kc.ServiceRoots())
+
// Start listening for requests.
http.Serve(listener, MakeRESTRouter(!no_get, !no_put, &kc))
}
@@ -101,7 +103,14 @@ type ApiTokenCache struct {
func RefreshServicesList(kc *keepclient.KeepClient) {
for {
time.Sleep(300 * time.Second)
+ oldservices := kc.ServiceRoots()
kc.DiscoverKeepServers()
+ newservices := kc.ServiceRoots()
+ s1 := fmt.Sprint(oldservices)
+ s2 := fmt.Sprint(newservices)
+ if s1 != s2 {
+ log.Printf("Updated server list to %v", s2)
+ }
}
}
@@ -167,7 +176,7 @@ func CheckAuthorizationHeader(kc keepclient.KeepClient, cache *ApiTokenCache, re
var usersreq *http.Request
- if usersreq, err = http.NewRequest("GET", fmt.Sprintf("https://%s/arvados/v1/users/current", kc.ApiServer), nil); err != nil {
+ if usersreq, err = http.NewRequest("HEAD", fmt.Sprintf("https://%s/arvados/v1/users/current", kc.ApiServer), nil); err != nil {
// Can't construct the request
log.Printf("%s: CheckAuthorizationHeader error: %v", GetRemoteAddress(req), err)
return false
@@ -206,6 +215,8 @@ type PutBlockHandler struct {
*ApiTokenCache
}
+type InvalidPathHandler struct{}
+
// MakeRESTRouter
// Returns a mux.Router that passes GET and PUT requests to the
// appropriate handlers.
@@ -233,9 +244,16 @@ func MakeRESTRouter(
ph.Methods("PUT")
}
+ rest.NotFoundHandler = InvalidPathHandler{}
+
return rest
}
+func (this InvalidPathHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+ log.Printf("%s: %s %s unroutable", GetRemoteAddress(req), req.Method, req.URL.Path)
+ http.Error(resp, "Bad request", http.StatusBadRequest)
+}
+
func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
kc := *this.KeepClient
commit d035296164d63af58bcbb75d81b7cf6fe53f077c
Merge: e08308d 2099a0a
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Fri May 23 10:27:59 2014 -0400
Merge branch 'master' into 1885-keep-proxy
commit e08308d3b0797264219a0a5e9967e8e0ec2e9202
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Fri May 23 10:26:31 2014 -0400
1885: Fix bug in not returning from the handler on failed authorization.
Improved logging (which uncovered the previous bug.) Tweaked go.sh and tests.
diff --git a/sdk/go/go.sh b/sdk/go/go.sh
index 89f81fb..5553567 100755
--- a/sdk/go/go.sh
+++ b/sdk/go/go.sh
@@ -1,6 +1,11 @@
#! /bin/sh
-rootdir=$(dirname $0)
+# Wraps the 'go' executable with some environment setup. Sets GOPATH, creates
+# 'pkg' and 'bin' directories, automatically installs dependencies, then runs
+# the underlying 'go' executable with any command line parameters provided to
+# the script.
+
+rootdir=$(readlink -f $(dirname $0))
GOPATH=$rootdir:$GOPATH
export GOPATH
diff --git a/sdk/go/src/arvados.org/keepclient/keepclient.go b/sdk/go/src/arvados.org/keepclient/keepclient.go
index dadf8bf..5ba6257 100644
--- a/sdk/go/src/arvados.org/keepclient/keepclient.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient.go
@@ -24,6 +24,9 @@ var BlockNotFound = errors.New("Block not found")
var InsufficientReplicasError = errors.New("Could not write sufficient replicas")
var OversizeBlockError = errors.New("Block too big")
+const X_Keep_Desired_Replicas = "X-Keep-Desired-Replicas"
+const X_Keep_Replicas_Stored = "X-Keep-Replicas-Stored"
+
// Information about Arvados and Keep servers.
type KeepClient struct {
ApiServer string
diff --git a/sdk/go/src/arvados.org/keepclient/keepclient_test.go b/sdk/go/src/arvados.org/keepclient/keepclient_test.go
index 395603d..291d8f8 100644
--- a/sdk/go/src/arvados.org/keepclient/keepclient_test.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient_test.go
@@ -44,8 +44,12 @@ func (s *ServerRequiredSuite) SetUpSuite(c *C) {
c.Skip("Skipping tests that require server")
} else {
os.Chdir(pythonDir())
- exec.Command("python", "run_test_server.py", "start").Run()
- exec.Command("python", "run_test_server.py", "start_keep").Run()
+ if err := exec.Command("python", "run_test_server.py", "start").Run(); err != nil {
+ panic("'python run_test_server.py start' returned error")
+ }
+ if err := exec.Command("python", "run_test_server.py", "start_keep").Run(); err != nil {
+ panic("'python run_test_server.py start_keep' returned error")
+ }
}
}
diff --git a/sdk/go/src/arvados.org/keepclient/support.go b/sdk/go/src/arvados.org/keepclient/support.go
index 38669a1..f924137 100644
--- a/sdk/go/src/arvados.org/keepclient/support.go
+++ b/sdk/go/src/arvados.org/keepclient/support.go
@@ -156,7 +156,7 @@ type uploadStatus struct {
func (this KeepClient) uploadToKeepServer(host string, hash string, body io.ReadCloser,
upload_status chan<- uploadStatus, expectedLength int64) {
- log.Printf("Uploading to %s", host)
+ log.Printf("Uploading %s to %s", hash, host)
var req *http.Request
var err error
@@ -175,7 +175,7 @@ func (this KeepClient) uploadToKeepServer(host string, hash string, body io.Read
req.Header.Add("Content-Type", "application/octet-stream")
if this.Using_proxy {
- req.Header.Add("X-Keep-Desired-Replicas", fmt.Sprint(this.Want_replicas))
+ req.Header.Add(X_Keep_Desired_Replicas, fmt.Sprint(this.Want_replicas))
}
req.Body = body
@@ -188,7 +188,7 @@ func (this KeepClient) uploadToKeepServer(host string, hash string, body io.Read
}
rep := 1
- if xr := resp.Header.Get("X-Keep-Replicas-Stored"); xr != "" {
+ if xr := resp.Header.Get(X_Keep_Replicas_Stored); xr != "" {
fmt.Sscanf(xr, "%d", &rep)
}
@@ -248,7 +248,7 @@ func (this KeepClient) putReplicas(
status.url, status.err)
}
active -= 1
- log.Printf("Upload status code: %v remaining replicas: %v active: %v", status.statusCode, remaining_replicas, active)
+ log.Printf("Upload to %v status code: %v remaining replicas: %v active: %v", status.url, status.statusCode, remaining_replicas, active)
}
return this.Want_replicas, nil
diff --git a/services/keep/go.sh b/services/keep/go.sh
index 177b27d..156fe90 100755
--- a/services/keep/go.sh
+++ b/services/keep/go.sh
@@ -1,6 +1,11 @@
#! /bin/sh
-rootdir=$(dirname $0)
+# Wraps the 'go' executable with some environment setup. Sets GOPATH, creates
+# 'pkg' and 'bin' directories, automatically installs dependencies, then runs
+# the underlying 'go' executable with any command line parameters provided to
+# the script.
+
+rootdir=$(readlink -f $(dirname $0))
GOPATH=$rootdir:$rootdir/../../sdk/go:$GOPATH
export GOPATH
diff --git a/services/keep/src/arvados.org/keepproxy/keepproxy.go b/services/keep/src/arvados.org/keepproxy/keepproxy.go
index b914f47..9e0b2ff 100644
--- a/services/keep/src/arvados.org/keepproxy/keepproxy.go
+++ b/services/keep/src/arvados.org/keepproxy/keepproxy.go
@@ -63,15 +63,9 @@ func main() {
flag.Parse()
- /*if no_get == false {
- log.Print("Must specify -no-get")
- return
- }*/
-
kc, err := keepclient.MakeKeepClient()
if err != nil {
- log.Print(err)
- return
+ log.Fatal(err)
}
if pidfile != "" {
@@ -88,8 +82,7 @@ func main() {
listener, err = net.Listen("tcp", listen)
if err != nil {
- log.Printf("Could not listen on %v", listen)
- return
+ log.Fatalf("Could not listen on %v", listen)
}
go RefreshServicesList(&kc)
@@ -143,13 +136,25 @@ func (this *ApiTokenCache) RecallToken(token string) bool {
}
}
+func GetRemoteAddress(req *http.Request) string {
+ if realip := req.Header.Get("X-Real-IP"); realip != "" {
+ if forwarded := req.Header.Get("X-Forwarded-For"); forwarded != realip {
+ return fmt.Sprintf("%s (X-Forwarded-For %s)", realip, forwarded)
+ } else {
+ return realip
+ }
+ }
+ return req.RemoteAddr
+}
+
func CheckAuthorizationHeader(kc keepclient.KeepClient, cache *ApiTokenCache, req *http.Request) bool {
- if req.Header.Get("Authorization") == "" {
+ var auth string
+ if auth = req.Header.Get("Authorization"); auth == "" {
return false
}
var tok string
- _, err := fmt.Sscanf(req.Header.Get("Authorization"), "OAuth2 %s", &tok)
+ _, err := fmt.Sscanf(auth, "OAuth2 %s", &tok)
if err != nil {
// Scanning error
return false
@@ -164,7 +169,7 @@ func CheckAuthorizationHeader(kc keepclient.KeepClient, cache *ApiTokenCache, re
if usersreq, err = http.NewRequest("GET", fmt.Sprintf("https://%s/arvados/v1/users/current", kc.ApiServer), nil); err != nil {
// Can't construct the request
- log.Print("CheckAuthorizationHeader error: %v", err)
+ log.Printf("%s: CheckAuthorizationHeader error: %v", GetRemoteAddress(req), err)
return false
}
@@ -175,12 +180,13 @@ func CheckAuthorizationHeader(kc keepclient.KeepClient, cache *ApiTokenCache, re
var resp *http.Response
if resp, err = kc.Client.Do(usersreq); err != nil {
// Something else failed
- log.Print("CheckAuthorizationHeader error: %v", err)
+ log.Printf("%s: CheckAuthorizationHeader error connecting to API server: %v", GetRemoteAddress(req), err.Error())
return false
}
if resp.StatusCode != http.StatusOK {
// Bad status
+ log.Printf("%s: CheckAuthorizationHeader API server responded: %v", GetRemoteAddress(req), resp.Status)
return false
}
@@ -234,14 +240,17 @@ func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
kc := *this.KeepClient
- if !CheckAuthorizationHeader(kc, this.ApiTokenCache, req) {
- http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
- }
-
hash := mux.Vars(req)["hash"]
signature := mux.Vars(req)["signature"]
timestamp := mux.Vars(req)["timestamp"]
+ log.Printf("%s: %s %s", GetRemoteAddress(req), req.Method, hash)
+
+ if !CheckAuthorizationHeader(kc, this.ApiTokenCache, req) {
+ http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
+ return
+ }
+
var reader io.ReadCloser
var err error
var blocklen int64
@@ -258,25 +267,32 @@ func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
switch err {
case nil:
if reader != nil {
- io.Copy(resp, reader)
+ n, err2 := io.Copy(resp, reader)
+ if n != blocklen {
+ log.Printf("%s: %s %s mismatched return %v with Content-Length %v error", GetRemoteAddress(req), req.Method, hash, n, blocklen, err.Error())
+ } else if err2 == nil {
+ log.Printf("%s: %s %s success returned %v bytes", GetRemoteAddress(req), req.Method, hash, n)
+ } else {
+ log.Printf("%s: %s %s returned %v bytes error %v", GetRemoteAddress(req), req.Method, hash, n, err.Error())
+ }
+ } else {
+ log.Printf("%s: %s %s success", GetRemoteAddress(req), req.Method, hash)
}
case keepclient.BlockNotFound:
http.Error(resp, "Not found", http.StatusNotFound)
default:
http.Error(resp, err.Error(), http.StatusBadGateway)
}
+
+ if err != nil {
+ log.Printf("%s: %s %s error %s", GetRemoteAddress(req), req.Method, hash, err.Error())
+ }
}
func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
- log.Print("PutBlockHandler start")
-
kc := *this.KeepClient
- if !CheckAuthorizationHeader(kc, this.ApiTokenCache, req) {
- http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
- }
-
hash := mux.Vars(req)["hash"]
var contentLength int64 = -1
@@ -288,15 +304,22 @@ func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
}
+ log.Printf("%s: %s %s Content-Length %v", GetRemoteAddress(req), req.Method, hash, contentLength)
+
if contentLength < 1 {
http.Error(resp, "Must include Content-Length header", http.StatusLengthRequired)
return
}
+ if !CheckAuthorizationHeader(kc, this.ApiTokenCache, req) {
+ http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
+ return
+ }
+
// Check if the client specified the number of replicas
if req.Header.Get("X-Keep-Desired-Replicas") != "" {
var r int
- _, err := fmt.Sscanf(req.Header.Get("X-Keep-Desired-Replicas"), "%d", &r)
+ _, err := fmt.Sscanf(req.Header.Get(keepclient.X_Keep_Desired_Replicas), "%d", &r)
if err != nil {
kc.Want_replicas = r
}
@@ -305,14 +328,13 @@ func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
// Now try to put the block through
replicas, err := kc.PutHR(hash, req.Body, contentLength)
- log.Printf("Replicas stored: %v err: %v", replicas, err)
-
// Tell the client how many successful PUTs we accomplished
- resp.Header().Set("X-Keep-Replicas-Stored", fmt.Sprintf("%d", replicas))
+ resp.Header().Set(keepclient.X_Keep_Replicas_Stored, fmt.Sprintf("%d", replicas))
switch err {
case nil:
// Default will return http.StatusOK
+ log.Printf("%s: %s %s finished, stored %v replicas (desired %v)", GetRemoteAddress(req), req.Method, hash, replicas, kc.Want_replicas)
case keepclient.OversizeBlockError:
// Too much data
@@ -332,4 +354,8 @@ func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
http.Error(resp, err.Error(), http.StatusBadGateway)
}
+ if err != nil {
+ log.Printf("%s: %s %s stored %v replicas (desired %v) got error %v", GetRemoteAddress(req), req.Method, hash, replicas, kc.Want_replicas, err.Error())
+ }
+
}
diff --git a/services/keep/src/arvados.org/keepproxy/keepproxy_test.go b/services/keep/src/arvados.org/keepproxy/keepproxy_test.go
index d8abda7..f03c94f 100644
--- a/services/keep/src/arvados.org/keepproxy/keepproxy_test.go
+++ b/services/keep/src/arvados.org/keepproxy/keepproxy_test.go
@@ -39,8 +39,13 @@ func (s *ServerRequiredSuite) SetUpSuite(c *C) {
defer os.Chdir(cwd)
os.Chdir(pythonDir())
- exec.Command("python", "run_test_server.py", "start").Run()
- exec.Command("python", "run_test_server.py", "start_keep").Run()
+
+ if err := exec.Command("python", "run_test_server.py", "start").Run(); err != nil {
+ panic("'python run_test_server.py start' returned error")
+ }
+ if err := exec.Command("python", "run_test_server.py", "start_keep").Run(); err != nil {
+ panic("'python run_test_server.py start_keep' returned error")
+ }
os.Setenv("ARVADOS_API_HOST", "localhost:3001")
os.Setenv("ARVADOS_API_TOKEN", "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
commit 5459127f0ce9d3bf6ee12a034d9a7df0f4f75a39
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Thu May 22 14:51:56 2014 -0400
1885: Made ServiceRoots atomically updatable, so that KeepProxy can poll for
changes in the services list without disrupting any active flows.
diff --git a/sdk/go/src/arvados.org/keepclient/keepclient.go b/sdk/go/src/arvados.org/keepclient/keepclient.go
index 91989bd..dadf8bf 100644
--- a/sdk/go/src/arvados.org/keepclient/keepclient.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient.go
@@ -11,6 +11,10 @@ import (
"io/ioutil"
"net/http"
"os"
+ "sort"
+ "sync"
+ "sync/atomic"
+ "unsafe"
)
// A Keep "block" is 64MB.
@@ -25,11 +29,12 @@ type KeepClient struct {
ApiServer string
ApiToken string
ApiInsecure bool
- Service_roots []string
Want_replicas int
Client *http.Client
Using_proxy bool
External bool
+ service_roots *[]string
+ lock sync.Mutex
}
// Create a new KeepClient, initialized with standard Arvados environment
@@ -50,7 +55,7 @@ func MakeKeepClient() (kc KeepClient, err error) {
Using_proxy: false,
External: external}
- err = (&kc).discoverKeepServers()
+ err = (&kc).DiscoverKeepServers()
return kc, err
}
@@ -207,3 +212,22 @@ func (this KeepClient) AuthorizedAsk(hash string, signature string,
return 0, "", BlockNotFound
}
+
+// Atomically read the service_roots field.
+func (this *KeepClient) ServiceRoots() []string {
+ r := (*[]string)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots))))
+ return *r
+}
+
+// Atomically update the service_roots field. Enables you to update
+// service_roots without disrupting any GET or PUT operations that might
+// already be in progress.
+func (this *KeepClient) SetServiceRoots(svc []string) {
+ // Must be sorted for ShuffledServiceRoots() to produce consistent
+ // results.
+ roots := make([]string, len(svc))
+ copy(roots, svc)
+ sort.Strings(roots)
+ atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots)),
+ unsafe.Pointer(&roots))
+}
diff --git a/sdk/go/src/arvados.org/keepclient/keepclient_test.go b/sdk/go/src/arvados.org/keepclient/keepclient_test.go
index 600d739..395603d 100644
--- a/sdk/go/src/arvados.org/keepclient/keepclient_test.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient_test.go
@@ -13,7 +13,6 @@ import (
"net/http"
"os"
"os/exec"
- "sort"
"strings"
"testing"
)
@@ -75,13 +74,14 @@ func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
c.Check(kc.Client.Transport.(*http.Transport).TLSClientConfig.InsecureSkipVerify, Equals, true)
c.Assert(err, Equals, nil)
- c.Check(len(kc.Service_roots), Equals, 2)
- c.Check(kc.Service_roots[0], Equals, "http://localhost:25107")
- c.Check(kc.Service_roots[1], Equals, "http://localhost:25108")
+ c.Check(len(kc.ServiceRoots()), Equals, 2)
+ c.Check(kc.ServiceRoots()[0], Equals, "http://localhost:25107")
+ c.Check(kc.ServiceRoots()[1], Equals, "http://localhost:25108")
}
func (s *StandaloneSuite) TestShuffleServiceRoots(c *C) {
- kc := KeepClient{Service_roots: []string{"http://localhost:25107", "http://localhost:25108", "http://localhost:25109", "http://localhost:25110", "http://localhost:25111", "http://localhost:25112", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25116", "http://localhost:25117", "http://localhost:25118", "http://localhost:25119", "http://localhost:25120", "http://localhost:25121", "http://localhost:25122", "http://localhost:25123"}}
+ kc := KeepClient{}
+ kc.SetServiceRoots([]string{"http://localhost:25107", "http://localhost:25108", "http://localhost:25109", "http://localhost:25110", "http://localhost:25111", "http://localhost:25112", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25116", "http://localhost:25117", "http://localhost:25118", "http://localhost:25119", "http://localhost:25120", "http://localhost:25121", "http://localhost:25122", "http://localhost:25123"})
// "foo" acbd18db4cc2f85cedef654fccc4a4d8
foo_shuffle := []string{"http://localhost:25116", "http://localhost:25120", "http://localhost:25119", "http://localhost:25122", "http://localhost:25108", "http://localhost:25114", "http://localhost:25112", "http://localhost:25107", "http://localhost:25118", "http://localhost:25111", "http://localhost:25113", "http://localhost:25121", "http://localhost:25110", "http://localhost:25117", "http://localhost:25109", "http://localhost:25115", "http://localhost:25123"}
@@ -265,16 +265,16 @@ func (s *StandaloneSuite) TestPutB(c *C) {
kc.Want_replicas = 2
kc.ApiToken = "abc123"
- kc.Service_roots = make([]string, 5)
+ service_roots := make([]string, 5)
ks := RunSomeFakeKeepServers(st, 5, 2990)
for i := 0; i < len(ks); i += 1 {
- kc.Service_roots[i] = ks[i].url
+ service_roots[i] = ks[i].url
defer ks[i].listener.Close()
}
- sort.Strings(kc.Service_roots)
+ kc.SetServiceRoots(service_roots)
kc.PutB([]byte("foo"))
@@ -306,16 +306,16 @@ func (s *StandaloneSuite) TestPutHR(c *C) {
kc.Want_replicas = 2
kc.ApiToken = "abc123"
- kc.Service_roots = make([]string, 5)
+ service_roots := make([]string, 5)
ks := RunSomeFakeKeepServers(st, 5, 2990)
for i := 0; i < len(ks); i += 1 {
- kc.Service_roots[i] = ks[i].url
+ service_roots[i] = ks[i].url
defer ks[i].listener.Close()
}
- sort.Strings(kc.Service_roots)
+ kc.SetServiceRoots(service_roots)
reader, writer := io.Pipe()
@@ -359,21 +359,21 @@ func (s *StandaloneSuite) TestPutWithFail(c *C) {
kc.Want_replicas = 2
kc.ApiToken = "abc123"
- kc.Service_roots = make([]string, 5)
+ service_roots := make([]string, 5)
ks1 := RunSomeFakeKeepServers(st, 4, 2990)
ks2 := RunSomeFakeKeepServers(fh, 1, 2995)
for i, k := range ks1 {
- kc.Service_roots[i] = k.url
+ service_roots[i] = k.url
defer k.listener.Close()
}
for i, k := range ks2 {
- kc.Service_roots[len(ks1)+i] = k.url
+ service_roots[len(ks1)+i] = k.url
defer k.listener.Close()
}
- sort.Strings(kc.Service_roots)
+ kc.SetServiceRoots(service_roots)
shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
@@ -407,21 +407,21 @@ func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
kc.Want_replicas = 2
kc.ApiToken = "abc123"
- kc.Service_roots = make([]string, 5)
+ service_roots := make([]string, 5)
ks1 := RunSomeFakeKeepServers(st, 1, 2990)
ks2 := RunSomeFakeKeepServers(fh, 4, 2991)
for i, k := range ks1 {
- kc.Service_roots[i] = k.url
+ service_roots[i] = k.url
defer k.listener.Close()
}
for i, k := range ks2 {
- kc.Service_roots[len(ks1)+i] = k.url
+ service_roots[len(ks1)+i] = k.url
defer k.listener.Close()
}
- sort.Strings(kc.Service_roots)
+ kc.SetServiceRoots(service_roots)
shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
@@ -464,7 +464,7 @@ func (s *StandaloneSuite) TestGet(c *C) {
kc, _ := MakeKeepClient()
kc.ApiToken = "abc123"
- kc.Service_roots = []string{url}
+ kc.SetServiceRoots([]string{url})
r, n, url2, err := kc.Get(hash)
defer r.Close()
@@ -489,7 +489,7 @@ func (s *StandaloneSuite) TestGetFail(c *C) {
kc, _ := MakeKeepClient()
kc.ApiToken = "abc123"
- kc.Service_roots = []string{url}
+ kc.SetServiceRoots([]string{url})
r, n, url2, err := kc.Get(hash)
c.Check(err, Equals, BlockNotFound)
@@ -518,7 +518,7 @@ func (s *StandaloneSuite) TestChecksum(c *C) {
kc, _ := MakeKeepClient()
kc.ApiToken = "abc123"
- kc.Service_roots = []string{url}
+ kc.SetServiceRoots([]string{url})
r, n, _, err := kc.Get(barhash)
_, err = ioutil.ReadAll(r)
@@ -550,21 +550,21 @@ func (s *StandaloneSuite) TestGetWithFailures(c *C) {
kc, _ := MakeKeepClient()
kc.ApiToken = "abc123"
- kc.Service_roots = make([]string, 5)
+ service_roots := make([]string, 5)
ks1 := RunSomeFakeKeepServers(st, 1, 2990)
ks2 := RunSomeFakeKeepServers(fh, 4, 2991)
for i, k := range ks1 {
- kc.Service_roots[i] = k.url
+ service_roots[i] = k.url
defer k.listener.Close()
}
for i, k := range ks2 {
- kc.Service_roots[len(ks1)+i] = k.url
+ service_roots[len(ks1)+i] = k.url
defer k.listener.Close()
}
- sort.Strings(kc.Service_roots)
+ kc.SetServiceRoots(service_roots)
r, n, url2, err := kc.Get(hash)
<-fh.handled
@@ -635,15 +635,17 @@ func (s *StandaloneSuite) TestPutProxy(c *C) {
kc.Want_replicas = 2
kc.Using_proxy = true
kc.ApiToken = "abc123"
- kc.Service_roots = make([]string, 1)
+ service_roots := make([]string, 1)
ks1 := RunSomeFakeKeepServers(st, 1, 2990)
for i, k := range ks1 {
- kc.Service_roots[i] = k.url
+ service_roots[i] = k.url
defer k.listener.Close()
}
+ kc.SetServiceRoots(service_roots)
+
_, replicas, err := kc.PutB([]byte("foo"))
<-st.handled
@@ -663,14 +665,15 @@ func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) {
kc.Want_replicas = 3
kc.Using_proxy = true
kc.ApiToken = "abc123"
- kc.Service_roots = make([]string, 1)
+ service_roots := make([]string, 1)
ks1 := RunSomeFakeKeepServers(st, 1, 2990)
for i, k := range ks1 {
- kc.Service_roots[i] = k.url
+ service_roots[i] = k.url
defer k.listener.Close()
}
+ kc.SetServiceRoots(service_roots)
_, replicas, err := kc.PutB([]byte("foo"))
<-st.handled
diff --git a/sdk/go/src/arvados.org/keepclient/support.go b/sdk/go/src/arvados.org/keepclient/support.go
index ef4a8e1..38669a1 100644
--- a/sdk/go/src/arvados.org/keepclient/support.go
+++ b/sdk/go/src/arvados.org/keepclient/support.go
@@ -10,7 +10,6 @@ import (
"log"
"net/http"
"os"
- "sort"
"strconv"
)
@@ -21,10 +20,9 @@ type keepDisk struct {
SvcType string `json:"service_type"`
}
-func (this *KeepClient) discoverKeepServers() error {
+func (this *KeepClient) DiscoverKeepServers() error {
if prx := os.Getenv("ARVADOS_KEEP_PROXY"); prx != "" {
- this.Service_roots = make([]string, 1)
- this.Service_roots[0] = prx
+ this.SetServiceRoots([]string{prx})
this.Using_proxy = true
return nil
}
@@ -72,7 +70,7 @@ func (this *KeepClient) discoverKeepServers() error {
}
listed := make(map[string]bool)
- this.Service_roots = make([]string, 0, len(m.Items))
+ service_roots := make([]string, 0, len(m.Items))
for _, element := range m.Items {
n := ""
@@ -87,16 +85,14 @@ func (this *KeepClient) discoverKeepServers() error {
// Skip duplicates
if !listed[url] {
listed[url] = true
- this.Service_roots = append(this.Service_roots, url)
+ service_roots = append(service_roots, url)
}
if element.SvcType == "proxy" {
this.Using_proxy = true
}
}
- // Must be sorted for ShuffledServiceRoots() to produce consistent
- // results.
- sort.Strings(this.Service_roots)
+ this.SetServiceRoots(service_roots)
return nil
}
@@ -111,11 +107,12 @@ func (this KeepClient) shuffledServiceRoots(hash string) (pseq []string) {
seed := hash
// Keep servers still to be added to the ordering
- pool := make([]string, len(this.Service_roots))
- copy(pool, this.Service_roots)
+ service_roots := this.ServiceRoots()
+ pool := make([]string, len(service_roots))
+ copy(pool, service_roots)
// output probe sequence
- pseq = make([]string, 0, len(this.Service_roots))
+ pseq = make([]string, 0, len(service_roots))
// iterate while there are servers left to be assigned
for len(pool) > 0 {
diff --git a/services/keep/src/arvados.org/keepproxy/keepproxy.go b/services/keep/src/arvados.org/keepproxy/keepproxy.go
index ed33ac9..b914f47 100644
--- a/services/keep/src/arvados.org/keepproxy/keepproxy.go
+++ b/services/keep/src/arvados.org/keepproxy/keepproxy.go
@@ -92,8 +92,10 @@ func main() {
return
}
+ go RefreshServicesList(&kc)
+
// Start listening for requests.
- http.Serve(listener, MakeRESTRouter(!no_get, !no_put, kc))
+ http.Serve(listener, MakeRESTRouter(!no_get, !no_put, &kc))
}
type ApiTokenCache struct {
@@ -102,6 +104,14 @@ type ApiTokenCache struct {
expireTime int64
}
+// Refresh the keep service list every five minutes.
+func RefreshServicesList(kc *keepclient.KeepClient) {
+ for {
+ time.Sleep(300 * time.Second)
+ kc.DiscoverKeepServers()
+ }
+}
+
// Cache the token and set an expire time. If we already have an expire time
// on the token, it is not updated.
func (this *ApiTokenCache) RememberToken(token string) {
@@ -181,12 +191,12 @@ func CheckAuthorizationHeader(kc keepclient.KeepClient, cache *ApiTokenCache, re
}
type GetBlockHandler struct {
- keepclient.KeepClient
+ *keepclient.KeepClient
*ApiTokenCache
}
type PutBlockHandler struct {
- keepclient.KeepClient
+ *keepclient.KeepClient
*ApiTokenCache
}
@@ -197,7 +207,7 @@ type PutBlockHandler struct {
func MakeRESTRouter(
enable_get bool,
enable_put bool,
- kc keepclient.KeepClient) *mux.Router {
+ kc *keepclient.KeepClient) *mux.Router {
t := &ApiTokenCache{tokens: make(map[string]int64), expireTime: 300}
@@ -222,7 +232,9 @@ func MakeRESTRouter(
func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
- if !CheckAuthorizationHeader(this.KeepClient, this.ApiTokenCache, req) {
+ kc := *this.KeepClient
+
+ if !CheckAuthorizationHeader(kc, this.ApiTokenCache, req) {
http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
}
@@ -235,10 +247,10 @@ func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
var blocklen int64
if req.Method == "GET" {
- reader, blocklen, _, err = this.KeepClient.AuthorizedGet(hash, signature, timestamp)
+ reader, blocklen, _, err = kc.AuthorizedGet(hash, signature, timestamp)
defer reader.Close()
} else if req.Method == "HEAD" {
- blocklen, _, err = this.KeepClient.AuthorizedAsk(hash, signature, timestamp)
+ blocklen, _, err = kc.AuthorizedAsk(hash, signature, timestamp)
}
resp.Header().Set("Content-Length", fmt.Sprint(blocklen))
@@ -259,7 +271,9 @@ func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
log.Print("PutBlockHandler start")
- if !CheckAuthorizationHeader(this.KeepClient, this.ApiTokenCache, req) {
+ kc := *this.KeepClient
+
+ if !CheckAuthorizationHeader(kc, this.ApiTokenCache, req) {
http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
}
@@ -284,12 +298,12 @@ func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
var r int
_, err := fmt.Sscanf(req.Header.Get("X-Keep-Desired-Replicas"), "%d", &r)
if err != nil {
- this.KeepClient.Want_replicas = r
+ kc.Want_replicas = r
}
}
// Now try to put the block through
- replicas, err := this.KeepClient.PutHR(hash, req.Body, contentLength)
+ replicas, err := kc.PutHR(hash, req.Body, contentLength)
log.Printf("Replicas stored: %v err: %v", replicas, err)
diff --git a/services/keep/src/arvados.org/keepproxy/keepproxy_test.go b/services/keep/src/arvados.org/keepproxy/keepproxy_test.go
index af1377b..d8abda7 100644
--- a/services/keep/src/arvados.org/keepproxy/keepproxy_test.go
+++ b/services/keep/src/arvados.org/keepproxy/keepproxy_test.go
@@ -109,8 +109,8 @@ func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
kc, err := keepclient.MakeKeepClient()
c.Check(kc.External, Equals, true)
c.Check(kc.Using_proxy, Equals, true)
- c.Check(len(kc.Service_roots), Equals, 1)
- c.Check(kc.Service_roots[0], Equals, "http://localhost:29950")
+ c.Check(len(kc.ServiceRoots()), Equals, 1)
+ c.Check(kc.ServiceRoots()[0], Equals, "http://localhost:29950")
c.Check(err, Equals, nil)
os.Setenv("ARVADOS_EXTERNAL_CLIENT", "")
@@ -159,8 +159,8 @@ func (s *ServerRequiredSuite) TestPutAskGetForbidden(c *C) {
kc.ApiToken = "123xyz"
c.Check(kc.External, Equals, true)
c.Check(kc.Using_proxy, Equals, true)
- c.Check(len(kc.Service_roots), Equals, 1)
- c.Check(kc.Service_roots[0], Equals, "http://localhost:29950")
+ c.Check(len(kc.ServiceRoots()), Equals, 1)
+ c.Check(kc.ServiceRoots()[0], Equals, "http://localhost:29950")
c.Check(err, Equals, nil)
os.Setenv("ARVADOS_EXTERNAL_CLIENT", "")
commit cbf78e13973654510c5cc33b6420667611ef5245
Merge: be92cf1 d582312
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Thu May 22 13:38:10 2014 -0400
Merge branch 'master' into 1885-keep-proxy refs #1885
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list