[ARVADOS] updated: 293b52241445edf9eb86e0a0f9b63fccc3aa7fbd
git at public.curoverse.com
git at public.curoverse.com
Fri May 23 15:54:14 EDT 2014
Summary of changes:
sdk/go/build.sh | 37 --
sdk/go/go.sh | 17 +
sdk/go/src/arvados.org/keepclient/keepclient.go | 45 ++-
.../src/arvados.org/keepclient/keepclient_test.go | 94 ++---
sdk/go/src/arvados.org/keepclient/support.go | 41 +--
services/keep/build.sh | 36 --
services/keep/go.sh | 17 +
.../keep/src/arvados.org/keepproxy/keepproxy.go | 379 +++++++++++++++++++++
.../src/arvados.org/keepproxy/keepproxy_test.go | 205 +++++++++++
9 files changed, 735 insertions(+), 136 deletions(-)
delete mode 100755 sdk/go/build.sh
create mode 100755 sdk/go/go.sh
delete mode 100755 services/keep/build.sh
create mode 100755 services/keep/go.sh
create mode 100644 services/keep/src/arvados.org/keepproxy/keepproxy.go
create mode 100644 services/keep/src/arvados.org/keepproxy/keepproxy_test.go
via 293b52241445edf9eb86e0a0f9b63fccc3aa7fbd (commit)
via b06affa05ffbb121512ae179feb06c0bf90541bd (commit)
via 8570c81e79a9c00a314e2cf28a008b9aeae65037 (commit)
via 8815915612d1d8fef2001083bec2a24b3ed2eb3f (commit)
via 7331dca3c9f4a75e291b9b975f7cd570ad2b3f96 (commit)
via b668cb88f24085f92858b268120650c28d3a79af (commit)
via ec4e4a338f45ce39b4081f67c2b991c15fdf0fcd (commit)
via 14eddaec927be590682e226b9998c3e616c0b72e (commit)
via 216733543e5348ef8ef25ee5edbe6ed2dd35f5d3 (commit)
via 34ab7b208c7eb14ccae1e31831286acf9ea29487 (commit)
via 45d037864f655373dacd681d753ea3270f4ba997 (commit)
via 744ca00ccd1e7fda9d4210b18c1f4a734f3b2261 (commit)
via cd85fc2c07a0e9d8c34b81e4523ee3d1ebd696e9 (commit)
via 48ffdd5ac196771381c8dc9ab47cfad5f1929720 (commit)
from b28565c8aa08cbf70762fa69e49c5067fcb57e96 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 293b52241445edf9eb86e0a0f9b63fccc3aa7fbd
Merge: b28565c b06affa
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Fri May 23 15:37:57 2014 -0400
Merge branch '1885-keep-proxy' closes #1885
commit b06affa05ffbb121512ae179feb06c0bf90541bd
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Fri May 23 14:08:53 2014 -0400
1885: Added logging of invalid requests. Added logging when the server list is
update. Improved KeepClient error reporting.
diff --git a/sdk/go/src/arvados.org/keepclient/keepclient.go b/sdk/go/src/arvados.org/keepclient/keepclient.go
index 5ba6257..8d26b32 100644
--- a/sdk/go/src/arvados.org/keepclient/keepclient.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient.go
@@ -23,6 +23,8 @@ const BLOCKSIZE = 64 * 1024 * 1024
var BlockNotFound = errors.New("Block not found")
var InsufficientReplicasError = errors.New("Could not write sufficient replicas")
var OversizeBlockError = errors.New("Block too big")
+var MissingArvadosApiHost = errors.New("Missing required environment variable ARVADOS_API_HOST")
+var MissingArvadosApiToken = errors.New("Missing required environment variable ARVADOS_API_TOKEN")
const X_Keep_Desired_Replicas = "X-Keep-Desired-Replicas"
const X_Keep_Replicas_Stored = "X-Keep-Replicas-Stored"
@@ -58,6 +60,13 @@ func MakeKeepClient() (kc KeepClient, err error) {
Using_proxy: false,
External: external}
+ if os.Getenv("ARVADOS_API_HOST") == "" {
+ return kc, MissingArvadosApiHost
+ }
+ if os.Getenv("ARVADOS_API_TOKEN") == "" {
+ return kc, MissingArvadosApiToken
+ }
+
err = (&kc).DiscoverKeepServers()
return kc, err
diff --git a/sdk/go/src/arvados.org/keepclient/support.go b/sdk/go/src/arvados.org/keepclient/support.go
index f924137..913f7c7 100644
--- a/sdk/go/src/arvados.org/keepclient/support.go
+++ b/sdk/go/src/arvados.org/keepclient/support.go
@@ -47,7 +47,7 @@ func (this *KeepClient) DiscoverKeepServers() error {
return err
}
- if resp.StatusCode != 200 {
+ if resp.StatusCode != http.StatusOK {
// fall back on keep disks
if req, err = http.NewRequest("GET", fmt.Sprintf("https://%s/arvados/v1/keep_disks", this.ApiServer), nil); err != nil {
return err
@@ -56,6 +56,9 @@ func (this *KeepClient) DiscoverKeepServers() error {
if resp, err = this.Client.Do(req); err != nil {
return err
}
+ if resp.StatusCode != http.StatusOK {
+ return errors.New(resp.Status)
+ }
}
type svcList struct {
diff --git a/services/keep/src/arvados.org/keepproxy/keepproxy.go b/services/keep/src/arvados.org/keepproxy/keepproxy.go
index 9e0b2ff..2d7e276 100644
--- a/services/keep/src/arvados.org/keepproxy/keepproxy.go
+++ b/services/keep/src/arvados.org/keepproxy/keepproxy.go
@@ -65,7 +65,7 @@ func main() {
kc, err := keepclient.MakeKeepClient()
if err != nil {
- log.Fatal(err)
+ log.Fatalf("Error setting up keep client %s", err.Error())
}
if pidfile != "" {
@@ -87,6 +87,8 @@ func main() {
go RefreshServicesList(&kc)
+ log.Printf("Arvados Keep proxy started listening on %v with server list %v", listener.Addr(), kc.ServiceRoots())
+
// Start listening for requests.
http.Serve(listener, MakeRESTRouter(!no_get, !no_put, &kc))
}
@@ -101,7 +103,14 @@ type ApiTokenCache struct {
func RefreshServicesList(kc *keepclient.KeepClient) {
for {
time.Sleep(300 * time.Second)
+ oldservices := kc.ServiceRoots()
kc.DiscoverKeepServers()
+ newservices := kc.ServiceRoots()
+ s1 := fmt.Sprint(oldservices)
+ s2 := fmt.Sprint(newservices)
+ if s1 != s2 {
+ log.Printf("Updated server list to %v", s2)
+ }
}
}
@@ -167,7 +176,7 @@ func CheckAuthorizationHeader(kc keepclient.KeepClient, cache *ApiTokenCache, re
var usersreq *http.Request
- if usersreq, err = http.NewRequest("GET", fmt.Sprintf("https://%s/arvados/v1/users/current", kc.ApiServer), nil); err != nil {
+ if usersreq, err = http.NewRequest("HEAD", fmt.Sprintf("https://%s/arvados/v1/users/current", kc.ApiServer), nil); err != nil {
// Can't construct the request
log.Printf("%s: CheckAuthorizationHeader error: %v", GetRemoteAddress(req), err)
return false
@@ -206,6 +215,8 @@ type PutBlockHandler struct {
*ApiTokenCache
}
+type InvalidPathHandler struct{}
+
// MakeRESTRouter
// Returns a mux.Router that passes GET and PUT requests to the
// appropriate handlers.
@@ -233,9 +244,16 @@ func MakeRESTRouter(
ph.Methods("PUT")
}
+ rest.NotFoundHandler = InvalidPathHandler{}
+
return rest
}
+func (this InvalidPathHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+ log.Printf("%s: %s %s unroutable", GetRemoteAddress(req), req.Method, req.URL.Path)
+ http.Error(resp, "Bad request", http.StatusBadRequest)
+}
+
func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
kc := *this.KeepClient
commit 8570c81e79a9c00a314e2cf28a008b9aeae65037
Merge: 8815915 2099a0a
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Fri May 23 10:27:59 2014 -0400
Merge branch 'master' into 1885-keep-proxy refs #1885
commit 8815915612d1d8fef2001083bec2a24b3ed2eb3f
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Fri May 23 10:26:31 2014 -0400
1885: Fix bug in not returning from the handler on failed authorization.
Improved logging (which uncovered the previous bug.) Tweaked go.sh and tests.
diff --git a/sdk/go/go.sh b/sdk/go/go.sh
index 89f81fb..5553567 100755
--- a/sdk/go/go.sh
+++ b/sdk/go/go.sh
@@ -1,6 +1,11 @@
#! /bin/sh
-rootdir=$(dirname $0)
+# Wraps the 'go' executable with some environment setup. Sets GOPATH, creates
+# 'pkg' and 'bin' directories, automatically installs dependencies, then runs
+# the underlying 'go' executable with any command line parameters provided to
+# the script.
+
+rootdir=$(readlink -f $(dirname $0))
GOPATH=$rootdir:$GOPATH
export GOPATH
diff --git a/sdk/go/src/arvados.org/keepclient/keepclient.go b/sdk/go/src/arvados.org/keepclient/keepclient.go
index dadf8bf..5ba6257 100644
--- a/sdk/go/src/arvados.org/keepclient/keepclient.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient.go
@@ -24,6 +24,9 @@ var BlockNotFound = errors.New("Block not found")
var InsufficientReplicasError = errors.New("Could not write sufficient replicas")
var OversizeBlockError = errors.New("Block too big")
+const X_Keep_Desired_Replicas = "X-Keep-Desired-Replicas"
+const X_Keep_Replicas_Stored = "X-Keep-Replicas-Stored"
+
// Information about Arvados and Keep servers.
type KeepClient struct {
ApiServer string
diff --git a/sdk/go/src/arvados.org/keepclient/keepclient_test.go b/sdk/go/src/arvados.org/keepclient/keepclient_test.go
index 395603d..291d8f8 100644
--- a/sdk/go/src/arvados.org/keepclient/keepclient_test.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient_test.go
@@ -44,8 +44,12 @@ func (s *ServerRequiredSuite) SetUpSuite(c *C) {
c.Skip("Skipping tests that require server")
} else {
os.Chdir(pythonDir())
- exec.Command("python", "run_test_server.py", "start").Run()
- exec.Command("python", "run_test_server.py", "start_keep").Run()
+ if err := exec.Command("python", "run_test_server.py", "start").Run(); err != nil {
+ panic("'python run_test_server.py start' returned error")
+ }
+ if err := exec.Command("python", "run_test_server.py", "start_keep").Run(); err != nil {
+ panic("'python run_test_server.py start_keep' returned error")
+ }
}
}
diff --git a/sdk/go/src/arvados.org/keepclient/support.go b/sdk/go/src/arvados.org/keepclient/support.go
index 38669a1..f924137 100644
--- a/sdk/go/src/arvados.org/keepclient/support.go
+++ b/sdk/go/src/arvados.org/keepclient/support.go
@@ -156,7 +156,7 @@ type uploadStatus struct {
func (this KeepClient) uploadToKeepServer(host string, hash string, body io.ReadCloser,
upload_status chan<- uploadStatus, expectedLength int64) {
- log.Printf("Uploading to %s", host)
+ log.Printf("Uploading %s to %s", hash, host)
var req *http.Request
var err error
@@ -175,7 +175,7 @@ func (this KeepClient) uploadToKeepServer(host string, hash string, body io.Read
req.Header.Add("Content-Type", "application/octet-stream")
if this.Using_proxy {
- req.Header.Add("X-Keep-Desired-Replicas", fmt.Sprint(this.Want_replicas))
+ req.Header.Add(X_Keep_Desired_Replicas, fmt.Sprint(this.Want_replicas))
}
req.Body = body
@@ -188,7 +188,7 @@ func (this KeepClient) uploadToKeepServer(host string, hash string, body io.Read
}
rep := 1
- if xr := resp.Header.Get("X-Keep-Replicas-Stored"); xr != "" {
+ if xr := resp.Header.Get(X_Keep_Replicas_Stored); xr != "" {
fmt.Sscanf(xr, "%d", &rep)
}
@@ -248,7 +248,7 @@ func (this KeepClient) putReplicas(
status.url, status.err)
}
active -= 1
- log.Printf("Upload status code: %v remaining replicas: %v active: %v", status.statusCode, remaining_replicas, active)
+ log.Printf("Upload to %v status code: %v remaining replicas: %v active: %v", status.url, status.statusCode, remaining_replicas, active)
}
return this.Want_replicas, nil
diff --git a/services/keep/go.sh b/services/keep/go.sh
index 177b27d..156fe90 100755
--- a/services/keep/go.sh
+++ b/services/keep/go.sh
@@ -1,6 +1,11 @@
#! /bin/sh
-rootdir=$(dirname $0)
+# Wraps the 'go' executable with some environment setup. Sets GOPATH, creates
+# 'pkg' and 'bin' directories, automatically installs dependencies, then runs
+# the underlying 'go' executable with any command line parameters provided to
+# the script.
+
+rootdir=$(readlink -f $(dirname $0))
GOPATH=$rootdir:$rootdir/../../sdk/go:$GOPATH
export GOPATH
diff --git a/services/keep/src/arvados.org/keepproxy/keepproxy.go b/services/keep/src/arvados.org/keepproxy/keepproxy.go
index b914f47..9e0b2ff 100644
--- a/services/keep/src/arvados.org/keepproxy/keepproxy.go
+++ b/services/keep/src/arvados.org/keepproxy/keepproxy.go
@@ -63,15 +63,9 @@ func main() {
flag.Parse()
- /*if no_get == false {
- log.Print("Must specify -no-get")
- return
- }*/
-
kc, err := keepclient.MakeKeepClient()
if err != nil {
- log.Print(err)
- return
+ log.Fatal(err)
}
if pidfile != "" {
@@ -88,8 +82,7 @@ func main() {
listener, err = net.Listen("tcp", listen)
if err != nil {
- log.Printf("Could not listen on %v", listen)
- return
+ log.Fatalf("Could not listen on %v", listen)
}
go RefreshServicesList(&kc)
@@ -143,13 +136,25 @@ func (this *ApiTokenCache) RecallToken(token string) bool {
}
}
+func GetRemoteAddress(req *http.Request) string {
+ if realip := req.Header.Get("X-Real-IP"); realip != "" {
+ if forwarded := req.Header.Get("X-Forwarded-For"); forwarded != realip {
+ return fmt.Sprintf("%s (X-Forwarded-For %s)", realip, forwarded)
+ } else {
+ return realip
+ }
+ }
+ return req.RemoteAddr
+}
+
func CheckAuthorizationHeader(kc keepclient.KeepClient, cache *ApiTokenCache, req *http.Request) bool {
- if req.Header.Get("Authorization") == "" {
+ var auth string
+ if auth = req.Header.Get("Authorization"); auth == "" {
return false
}
var tok string
- _, err := fmt.Sscanf(req.Header.Get("Authorization"), "OAuth2 %s", &tok)
+ _, err := fmt.Sscanf(auth, "OAuth2 %s", &tok)
if err != nil {
// Scanning error
return false
@@ -164,7 +169,7 @@ func CheckAuthorizationHeader(kc keepclient.KeepClient, cache *ApiTokenCache, re
if usersreq, err = http.NewRequest("GET", fmt.Sprintf("https://%s/arvados/v1/users/current", kc.ApiServer), nil); err != nil {
// Can't construct the request
- log.Print("CheckAuthorizationHeader error: %v", err)
+ log.Printf("%s: CheckAuthorizationHeader error: %v", GetRemoteAddress(req), err)
return false
}
@@ -175,12 +180,13 @@ func CheckAuthorizationHeader(kc keepclient.KeepClient, cache *ApiTokenCache, re
var resp *http.Response
if resp, err = kc.Client.Do(usersreq); err != nil {
// Something else failed
- log.Print("CheckAuthorizationHeader error: %v", err)
+ log.Printf("%s: CheckAuthorizationHeader error connecting to API server: %v", GetRemoteAddress(req), err.Error())
return false
}
if resp.StatusCode != http.StatusOK {
// Bad status
+ log.Printf("%s: CheckAuthorizationHeader API server responded: %v", GetRemoteAddress(req), resp.Status)
return false
}
@@ -234,14 +240,17 @@ func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
kc := *this.KeepClient
- if !CheckAuthorizationHeader(kc, this.ApiTokenCache, req) {
- http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
- }
-
hash := mux.Vars(req)["hash"]
signature := mux.Vars(req)["signature"]
timestamp := mux.Vars(req)["timestamp"]
+ log.Printf("%s: %s %s", GetRemoteAddress(req), req.Method, hash)
+
+ if !CheckAuthorizationHeader(kc, this.ApiTokenCache, req) {
+ http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
+ return
+ }
+
var reader io.ReadCloser
var err error
var blocklen int64
@@ -258,25 +267,32 @@ func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
switch err {
case nil:
if reader != nil {
- io.Copy(resp, reader)
+ n, err2 := io.Copy(resp, reader)
+ if n != blocklen {
+ log.Printf("%s: %s %s mismatched return %v with Content-Length %v error", GetRemoteAddress(req), req.Method, hash, n, blocklen, err.Error())
+ } else if err2 == nil {
+ log.Printf("%s: %s %s success returned %v bytes", GetRemoteAddress(req), req.Method, hash, n)
+ } else {
+ log.Printf("%s: %s %s returned %v bytes error %v", GetRemoteAddress(req), req.Method, hash, n, err.Error())
+ }
+ } else {
+ log.Printf("%s: %s %s success", GetRemoteAddress(req), req.Method, hash)
}
case keepclient.BlockNotFound:
http.Error(resp, "Not found", http.StatusNotFound)
default:
http.Error(resp, err.Error(), http.StatusBadGateway)
}
+
+ if err != nil {
+ log.Printf("%s: %s %s error %s", GetRemoteAddress(req), req.Method, hash, err.Error())
+ }
}
func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
- log.Print("PutBlockHandler start")
-
kc := *this.KeepClient
- if !CheckAuthorizationHeader(kc, this.ApiTokenCache, req) {
- http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
- }
-
hash := mux.Vars(req)["hash"]
var contentLength int64 = -1
@@ -288,15 +304,22 @@ func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
}
+ log.Printf("%s: %s %s Content-Length %v", GetRemoteAddress(req), req.Method, hash, contentLength)
+
if contentLength < 1 {
http.Error(resp, "Must include Content-Length header", http.StatusLengthRequired)
return
}
+ if !CheckAuthorizationHeader(kc, this.ApiTokenCache, req) {
+ http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
+ return
+ }
+
// Check if the client specified the number of replicas
if req.Header.Get("X-Keep-Desired-Replicas") != "" {
var r int
- _, err := fmt.Sscanf(req.Header.Get("X-Keep-Desired-Replicas"), "%d", &r)
+ _, err := fmt.Sscanf(req.Header.Get(keepclient.X_Keep_Desired_Replicas), "%d", &r)
if err != nil {
kc.Want_replicas = r
}
@@ -305,14 +328,13 @@ func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
// Now try to put the block through
replicas, err := kc.PutHR(hash, req.Body, contentLength)
- log.Printf("Replicas stored: %v err: %v", replicas, err)
-
// Tell the client how many successful PUTs we accomplished
- resp.Header().Set("X-Keep-Replicas-Stored", fmt.Sprintf("%d", replicas))
+ resp.Header().Set(keepclient.X_Keep_Replicas_Stored, fmt.Sprintf("%d", replicas))
switch err {
case nil:
// Default will return http.StatusOK
+ log.Printf("%s: %s %s finished, stored %v replicas (desired %v)", GetRemoteAddress(req), req.Method, hash, replicas, kc.Want_replicas)
case keepclient.OversizeBlockError:
// Too much data
@@ -332,4 +354,8 @@ func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
http.Error(resp, err.Error(), http.StatusBadGateway)
}
+ if err != nil {
+ log.Printf("%s: %s %s stored %v replicas (desired %v) got error %v", GetRemoteAddress(req), req.Method, hash, replicas, kc.Want_replicas, err.Error())
+ }
+
}
diff --git a/services/keep/src/arvados.org/keepproxy/keepproxy_test.go b/services/keep/src/arvados.org/keepproxy/keepproxy_test.go
index d8abda7..f03c94f 100644
--- a/services/keep/src/arvados.org/keepproxy/keepproxy_test.go
+++ b/services/keep/src/arvados.org/keepproxy/keepproxy_test.go
@@ -39,8 +39,13 @@ func (s *ServerRequiredSuite) SetUpSuite(c *C) {
defer os.Chdir(cwd)
os.Chdir(pythonDir())
- exec.Command("python", "run_test_server.py", "start").Run()
- exec.Command("python", "run_test_server.py", "start_keep").Run()
+
+ if err := exec.Command("python", "run_test_server.py", "start").Run(); err != nil {
+ panic("'python run_test_server.py start' returned error")
+ }
+ if err := exec.Command("python", "run_test_server.py", "start_keep").Run(); err != nil {
+ panic("'python run_test_server.py start_keep' returned error")
+ }
os.Setenv("ARVADOS_API_HOST", "localhost:3001")
os.Setenv("ARVADOS_API_TOKEN", "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
commit 7331dca3c9f4a75e291b9b975f7cd570ad2b3f96
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Thu May 22 14:51:56 2014 -0400
1885: Made ServiceRoots atomically updatable, so that KeepProxy can poll for
changes in the services list without disrupting any active flows.
diff --git a/sdk/go/src/arvados.org/keepclient/keepclient.go b/sdk/go/src/arvados.org/keepclient/keepclient.go
index 91989bd..dadf8bf 100644
--- a/sdk/go/src/arvados.org/keepclient/keepclient.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient.go
@@ -11,6 +11,10 @@ import (
"io/ioutil"
"net/http"
"os"
+ "sort"
+ "sync"
+ "sync/atomic"
+ "unsafe"
)
// A Keep "block" is 64MB.
@@ -25,11 +29,12 @@ type KeepClient struct {
ApiServer string
ApiToken string
ApiInsecure bool
- Service_roots []string
Want_replicas int
Client *http.Client
Using_proxy bool
External bool
+ service_roots *[]string
+ lock sync.Mutex
}
// Create a new KeepClient, initialized with standard Arvados environment
@@ -50,7 +55,7 @@ func MakeKeepClient() (kc KeepClient, err error) {
Using_proxy: false,
External: external}
- err = (&kc).discoverKeepServers()
+ err = (&kc).DiscoverKeepServers()
return kc, err
}
@@ -207,3 +212,22 @@ func (this KeepClient) AuthorizedAsk(hash string, signature string,
return 0, "", BlockNotFound
}
+
+// Atomically read the service_roots field.
+func (this *KeepClient) ServiceRoots() []string {
+ r := (*[]string)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots))))
+ return *r
+}
+
+// Atomically update the service_roots field. Enables you to update
+// service_roots without disrupting any GET or PUT operations that might
+// already be in progress.
+func (this *KeepClient) SetServiceRoots(svc []string) {
+ // Must be sorted for ShuffledServiceRoots() to produce consistent
+ // results.
+ roots := make([]string, len(svc))
+ copy(roots, svc)
+ sort.Strings(roots)
+ atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots)),
+ unsafe.Pointer(&roots))
+}
diff --git a/sdk/go/src/arvados.org/keepclient/keepclient_test.go b/sdk/go/src/arvados.org/keepclient/keepclient_test.go
index 600d739..395603d 100644
--- a/sdk/go/src/arvados.org/keepclient/keepclient_test.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient_test.go
@@ -13,7 +13,6 @@ import (
"net/http"
"os"
"os/exec"
- "sort"
"strings"
"testing"
)
@@ -75,13 +74,14 @@ func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
c.Check(kc.Client.Transport.(*http.Transport).TLSClientConfig.InsecureSkipVerify, Equals, true)
c.Assert(err, Equals, nil)
- c.Check(len(kc.Service_roots), Equals, 2)
- c.Check(kc.Service_roots[0], Equals, "http://localhost:25107")
- c.Check(kc.Service_roots[1], Equals, "http://localhost:25108")
+ c.Check(len(kc.ServiceRoots()), Equals, 2)
+ c.Check(kc.ServiceRoots()[0], Equals, "http://localhost:25107")
+ c.Check(kc.ServiceRoots()[1], Equals, "http://localhost:25108")
}
func (s *StandaloneSuite) TestShuffleServiceRoots(c *C) {
- kc := KeepClient{Service_roots: []string{"http://localhost:25107", "http://localhost:25108", "http://localhost:25109", "http://localhost:25110", "http://localhost:25111", "http://localhost:25112", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25116", "http://localhost:25117", "http://localhost:25118", "http://localhost:25119", "http://localhost:25120", "http://localhost:25121", "http://localhost:25122", "http://localhost:25123"}}
+ kc := KeepClient{}
+ kc.SetServiceRoots([]string{"http://localhost:25107", "http://localhost:25108", "http://localhost:25109", "http://localhost:25110", "http://localhost:25111", "http://localhost:25112", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25116", "http://localhost:25117", "http://localhost:25118", "http://localhost:25119", "http://localhost:25120", "http://localhost:25121", "http://localhost:25122", "http://localhost:25123"})
// "foo" acbd18db4cc2f85cedef654fccc4a4d8
foo_shuffle := []string{"http://localhost:25116", "http://localhost:25120", "http://localhost:25119", "http://localhost:25122", "http://localhost:25108", "http://localhost:25114", "http://localhost:25112", "http://localhost:25107", "http://localhost:25118", "http://localhost:25111", "http://localhost:25113", "http://localhost:25121", "http://localhost:25110", "http://localhost:25117", "http://localhost:25109", "http://localhost:25115", "http://localhost:25123"}
@@ -265,16 +265,16 @@ func (s *StandaloneSuite) TestPutB(c *C) {
kc.Want_replicas = 2
kc.ApiToken = "abc123"
- kc.Service_roots = make([]string, 5)
+ service_roots := make([]string, 5)
ks := RunSomeFakeKeepServers(st, 5, 2990)
for i := 0; i < len(ks); i += 1 {
- kc.Service_roots[i] = ks[i].url
+ service_roots[i] = ks[i].url
defer ks[i].listener.Close()
}
- sort.Strings(kc.Service_roots)
+ kc.SetServiceRoots(service_roots)
kc.PutB([]byte("foo"))
@@ -306,16 +306,16 @@ func (s *StandaloneSuite) TestPutHR(c *C) {
kc.Want_replicas = 2
kc.ApiToken = "abc123"
- kc.Service_roots = make([]string, 5)
+ service_roots := make([]string, 5)
ks := RunSomeFakeKeepServers(st, 5, 2990)
for i := 0; i < len(ks); i += 1 {
- kc.Service_roots[i] = ks[i].url
+ service_roots[i] = ks[i].url
defer ks[i].listener.Close()
}
- sort.Strings(kc.Service_roots)
+ kc.SetServiceRoots(service_roots)
reader, writer := io.Pipe()
@@ -359,21 +359,21 @@ func (s *StandaloneSuite) TestPutWithFail(c *C) {
kc.Want_replicas = 2
kc.ApiToken = "abc123"
- kc.Service_roots = make([]string, 5)
+ service_roots := make([]string, 5)
ks1 := RunSomeFakeKeepServers(st, 4, 2990)
ks2 := RunSomeFakeKeepServers(fh, 1, 2995)
for i, k := range ks1 {
- kc.Service_roots[i] = k.url
+ service_roots[i] = k.url
defer k.listener.Close()
}
for i, k := range ks2 {
- kc.Service_roots[len(ks1)+i] = k.url
+ service_roots[len(ks1)+i] = k.url
defer k.listener.Close()
}
- sort.Strings(kc.Service_roots)
+ kc.SetServiceRoots(service_roots)
shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
@@ -407,21 +407,21 @@ func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
kc.Want_replicas = 2
kc.ApiToken = "abc123"
- kc.Service_roots = make([]string, 5)
+ service_roots := make([]string, 5)
ks1 := RunSomeFakeKeepServers(st, 1, 2990)
ks2 := RunSomeFakeKeepServers(fh, 4, 2991)
for i, k := range ks1 {
- kc.Service_roots[i] = k.url
+ service_roots[i] = k.url
defer k.listener.Close()
}
for i, k := range ks2 {
- kc.Service_roots[len(ks1)+i] = k.url
+ service_roots[len(ks1)+i] = k.url
defer k.listener.Close()
}
- sort.Strings(kc.Service_roots)
+ kc.SetServiceRoots(service_roots)
shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
@@ -464,7 +464,7 @@ func (s *StandaloneSuite) TestGet(c *C) {
kc, _ := MakeKeepClient()
kc.ApiToken = "abc123"
- kc.Service_roots = []string{url}
+ kc.SetServiceRoots([]string{url})
r, n, url2, err := kc.Get(hash)
defer r.Close()
@@ -489,7 +489,7 @@ func (s *StandaloneSuite) TestGetFail(c *C) {
kc, _ := MakeKeepClient()
kc.ApiToken = "abc123"
- kc.Service_roots = []string{url}
+ kc.SetServiceRoots([]string{url})
r, n, url2, err := kc.Get(hash)
c.Check(err, Equals, BlockNotFound)
@@ -518,7 +518,7 @@ func (s *StandaloneSuite) TestChecksum(c *C) {
kc, _ := MakeKeepClient()
kc.ApiToken = "abc123"
- kc.Service_roots = []string{url}
+ kc.SetServiceRoots([]string{url})
r, n, _, err := kc.Get(barhash)
_, err = ioutil.ReadAll(r)
@@ -550,21 +550,21 @@ func (s *StandaloneSuite) TestGetWithFailures(c *C) {
kc, _ := MakeKeepClient()
kc.ApiToken = "abc123"
- kc.Service_roots = make([]string, 5)
+ service_roots := make([]string, 5)
ks1 := RunSomeFakeKeepServers(st, 1, 2990)
ks2 := RunSomeFakeKeepServers(fh, 4, 2991)
for i, k := range ks1 {
- kc.Service_roots[i] = k.url
+ service_roots[i] = k.url
defer k.listener.Close()
}
for i, k := range ks2 {
- kc.Service_roots[len(ks1)+i] = k.url
+ service_roots[len(ks1)+i] = k.url
defer k.listener.Close()
}
- sort.Strings(kc.Service_roots)
+ kc.SetServiceRoots(service_roots)
r, n, url2, err := kc.Get(hash)
<-fh.handled
@@ -635,15 +635,17 @@ func (s *StandaloneSuite) TestPutProxy(c *C) {
kc.Want_replicas = 2
kc.Using_proxy = true
kc.ApiToken = "abc123"
- kc.Service_roots = make([]string, 1)
+ service_roots := make([]string, 1)
ks1 := RunSomeFakeKeepServers(st, 1, 2990)
for i, k := range ks1 {
- kc.Service_roots[i] = k.url
+ service_roots[i] = k.url
defer k.listener.Close()
}
+ kc.SetServiceRoots(service_roots)
+
_, replicas, err := kc.PutB([]byte("foo"))
<-st.handled
@@ -663,14 +665,15 @@ func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) {
kc.Want_replicas = 3
kc.Using_proxy = true
kc.ApiToken = "abc123"
- kc.Service_roots = make([]string, 1)
+ service_roots := make([]string, 1)
ks1 := RunSomeFakeKeepServers(st, 1, 2990)
for i, k := range ks1 {
- kc.Service_roots[i] = k.url
+ service_roots[i] = k.url
defer k.listener.Close()
}
+ kc.SetServiceRoots(service_roots)
_, replicas, err := kc.PutB([]byte("foo"))
<-st.handled
diff --git a/sdk/go/src/arvados.org/keepclient/support.go b/sdk/go/src/arvados.org/keepclient/support.go
index ef4a8e1..38669a1 100644
--- a/sdk/go/src/arvados.org/keepclient/support.go
+++ b/sdk/go/src/arvados.org/keepclient/support.go
@@ -10,7 +10,6 @@ import (
"log"
"net/http"
"os"
- "sort"
"strconv"
)
@@ -21,10 +20,9 @@ type keepDisk struct {
SvcType string `json:"service_type"`
}
-func (this *KeepClient) discoverKeepServers() error {
+func (this *KeepClient) DiscoverKeepServers() error {
if prx := os.Getenv("ARVADOS_KEEP_PROXY"); prx != "" {
- this.Service_roots = make([]string, 1)
- this.Service_roots[0] = prx
+ this.SetServiceRoots([]string{prx})
this.Using_proxy = true
return nil
}
@@ -72,7 +70,7 @@ func (this *KeepClient) discoverKeepServers() error {
}
listed := make(map[string]bool)
- this.Service_roots = make([]string, 0, len(m.Items))
+ service_roots := make([]string, 0, len(m.Items))
for _, element := range m.Items {
n := ""
@@ -87,16 +85,14 @@ func (this *KeepClient) discoverKeepServers() error {
// Skip duplicates
if !listed[url] {
listed[url] = true
- this.Service_roots = append(this.Service_roots, url)
+ service_roots = append(service_roots, url)
}
if element.SvcType == "proxy" {
this.Using_proxy = true
}
}
- // Must be sorted for ShuffledServiceRoots() to produce consistent
- // results.
- sort.Strings(this.Service_roots)
+ this.SetServiceRoots(service_roots)
return nil
}
@@ -111,11 +107,12 @@ func (this KeepClient) shuffledServiceRoots(hash string) (pseq []string) {
seed := hash
// Keep servers still to be added to the ordering
- pool := make([]string, len(this.Service_roots))
- copy(pool, this.Service_roots)
+ service_roots := this.ServiceRoots()
+ pool := make([]string, len(service_roots))
+ copy(pool, service_roots)
// output probe sequence
- pseq = make([]string, 0, len(this.Service_roots))
+ pseq = make([]string, 0, len(service_roots))
// iterate while there are servers left to be assigned
for len(pool) > 0 {
diff --git a/services/keep/src/arvados.org/keepproxy/keepproxy.go b/services/keep/src/arvados.org/keepproxy/keepproxy.go
index ed33ac9..b914f47 100644
--- a/services/keep/src/arvados.org/keepproxy/keepproxy.go
+++ b/services/keep/src/arvados.org/keepproxy/keepproxy.go
@@ -92,8 +92,10 @@ func main() {
return
}
+ go RefreshServicesList(&kc)
+
// Start listening for requests.
- http.Serve(listener, MakeRESTRouter(!no_get, !no_put, kc))
+ http.Serve(listener, MakeRESTRouter(!no_get, !no_put, &kc))
}
type ApiTokenCache struct {
@@ -102,6 +104,14 @@ type ApiTokenCache struct {
expireTime int64
}
+// Refresh the keep service list every five minutes.
+func RefreshServicesList(kc *keepclient.KeepClient) {
+ for {
+ time.Sleep(300 * time.Second)
+ kc.DiscoverKeepServers()
+ }
+}
+
// Cache the token and set an expire time. If we already have an expire time
// on the token, it is not updated.
func (this *ApiTokenCache) RememberToken(token string) {
@@ -181,12 +191,12 @@ func CheckAuthorizationHeader(kc keepclient.KeepClient, cache *ApiTokenCache, re
}
type GetBlockHandler struct {
- keepclient.KeepClient
+ *keepclient.KeepClient
*ApiTokenCache
}
type PutBlockHandler struct {
- keepclient.KeepClient
+ *keepclient.KeepClient
*ApiTokenCache
}
@@ -197,7 +207,7 @@ type PutBlockHandler struct {
func MakeRESTRouter(
enable_get bool,
enable_put bool,
- kc keepclient.KeepClient) *mux.Router {
+ kc *keepclient.KeepClient) *mux.Router {
t := &ApiTokenCache{tokens: make(map[string]int64), expireTime: 300}
@@ -222,7 +232,9 @@ func MakeRESTRouter(
func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
- if !CheckAuthorizationHeader(this.KeepClient, this.ApiTokenCache, req) {
+ kc := *this.KeepClient
+
+ if !CheckAuthorizationHeader(kc, this.ApiTokenCache, req) {
http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
}
@@ -235,10 +247,10 @@ func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
var blocklen int64
if req.Method == "GET" {
- reader, blocklen, _, err = this.KeepClient.AuthorizedGet(hash, signature, timestamp)
+ reader, blocklen, _, err = kc.AuthorizedGet(hash, signature, timestamp)
defer reader.Close()
} else if req.Method == "HEAD" {
- blocklen, _, err = this.KeepClient.AuthorizedAsk(hash, signature, timestamp)
+ blocklen, _, err = kc.AuthorizedAsk(hash, signature, timestamp)
}
resp.Header().Set("Content-Length", fmt.Sprint(blocklen))
@@ -259,7 +271,9 @@ func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
log.Print("PutBlockHandler start")
- if !CheckAuthorizationHeader(this.KeepClient, this.ApiTokenCache, req) {
+ kc := *this.KeepClient
+
+ if !CheckAuthorizationHeader(kc, this.ApiTokenCache, req) {
http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
}
@@ -284,12 +298,12 @@ func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
var r int
_, err := fmt.Sscanf(req.Header.Get("X-Keep-Desired-Replicas"), "%d", &r)
if err != nil {
- this.KeepClient.Want_replicas = r
+ kc.Want_replicas = r
}
}
// Now try to put the block through
- replicas, err := this.KeepClient.PutHR(hash, req.Body, contentLength)
+ replicas, err := kc.PutHR(hash, req.Body, contentLength)
log.Printf("Replicas stored: %v err: %v", replicas, err)
diff --git a/services/keep/src/arvados.org/keepproxy/keepproxy_test.go b/services/keep/src/arvados.org/keepproxy/keepproxy_test.go
index af1377b..d8abda7 100644
--- a/services/keep/src/arvados.org/keepproxy/keepproxy_test.go
+++ b/services/keep/src/arvados.org/keepproxy/keepproxy_test.go
@@ -109,8 +109,8 @@ func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
kc, err := keepclient.MakeKeepClient()
c.Check(kc.External, Equals, true)
c.Check(kc.Using_proxy, Equals, true)
- c.Check(len(kc.Service_roots), Equals, 1)
- c.Check(kc.Service_roots[0], Equals, "http://localhost:29950")
+ c.Check(len(kc.ServiceRoots()), Equals, 1)
+ c.Check(kc.ServiceRoots()[0], Equals, "http://localhost:29950")
c.Check(err, Equals, nil)
os.Setenv("ARVADOS_EXTERNAL_CLIENT", "")
@@ -159,8 +159,8 @@ func (s *ServerRequiredSuite) TestPutAskGetForbidden(c *C) {
kc.ApiToken = "123xyz"
c.Check(kc.External, Equals, true)
c.Check(kc.Using_proxy, Equals, true)
- c.Check(len(kc.Service_roots), Equals, 1)
- c.Check(kc.Service_roots[0], Equals, "http://localhost:29950")
+ c.Check(len(kc.ServiceRoots()), Equals, 1)
+ c.Check(kc.ServiceRoots()[0], Equals, "http://localhost:29950")
c.Check(err, Equals, nil)
os.Setenv("ARVADOS_EXTERNAL_CLIENT", "")
commit b668cb88f24085f92858b268120650c28d3a79af
Merge: ec4e4a3 d582312
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Thu May 22 13:38:10 2014 -0400
Merge branch 'master' into 1885-keep-proxy refs #1885
commit ec4e4a338f45ce39b4081f67c2b991c15fdf0fcd
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Thu May 22 13:36:58 2014 -0400
1885: GET and HEAD through the proxy work correctly now. Added invalid API token test.
diff --git a/sdk/go/src/arvados.org/keepclient/keepclient_test.go b/sdk/go/src/arvados.org/keepclient/keepclient_test.go
index c67e09f..600d739 100644
--- a/sdk/go/src/arvados.org/keepclient/keepclient_test.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient_test.go
@@ -585,12 +585,20 @@ func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
kc, err := MakeKeepClient()
c.Assert(err, Equals, nil)
- hash, replicas, err := kc.PutB([]byte("foo"))
- c.Check(hash, Equals, fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
- c.Check(replicas, Equals, 2)
- c.Check(err, Equals, nil)
+ hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
{
+ n, _, err := kc.Ask(hash)
+ c.Check(err, Equals, BlockNotFound)
+ c.Check(n, Equals, int64(0))
+ }
+ {
+ hash2, replicas, err := kc.PutB([]byte("foo"))
+ c.Check(hash2, Equals, hash)
+ c.Check(replicas, Equals, 2)
+ c.Check(err, Equals, nil)
+ }
+ {
r, n, url2, err := kc.Get(hash)
c.Check(err, Equals, nil)
c.Check(n, Equals, int64(3))
@@ -600,7 +608,6 @@ func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
c.Check(err2, Equals, nil)
c.Check(content, DeepEquals, []byte("foo"))
}
-
{
n, url2, err := kc.Ask(hash)
c.Check(err, Equals, nil)
diff --git a/services/keep/src/arvados.org/keepproxy/keepproxy.go b/services/keep/src/arvados.org/keepproxy/keepproxy.go
index 8b8ff18..ed33ac9 100644
--- a/services/keep/src/arvados.org/keepproxy/keepproxy.go
+++ b/services/keep/src/arvados.org/keepproxy/keepproxy.go
@@ -25,7 +25,6 @@ func main() {
listen string
no_get bool
no_put bool
- no_head bool
default_replicas int
pidfile string
)
@@ -41,7 +40,7 @@ func main() {
flag.BoolVar(
&no_get,
"no-get",
- true,
+ false,
"If set, disable GET operations")
flag.BoolVar(
@@ -50,12 +49,6 @@ func main() {
false,
"If set, disable PUT operations")
- flag.BoolVar(
- &no_head,
- "no-head",
- true,
- "If set, disable HEAD operations")
-
flag.IntVar(
&default_replicas,
"default-replicas",
@@ -70,8 +63,8 @@ func main() {
flag.Parse()
- /*if no_get == false || no_head == false {
- log.Print("Must specify -no-get and -no-head")
+ /*if no_get == false {
+ log.Print("Must specify -no-get")
return
}*/
@@ -100,7 +93,7 @@ func main() {
}
// Start listening for requests.
- http.Serve(listener, MakeRESTRouter(!no_get, !no_put, !no_head, kc))
+ http.Serve(listener, MakeRESTRouter(!no_get, !no_put, kc))
}
type ApiTokenCache struct {
@@ -204,7 +197,6 @@ type PutBlockHandler struct {
func MakeRESTRouter(
enable_get bool,
enable_put bool,
- enable_head bool,
kc keepclient.KeepClient) *mux.Router {
t := &ApiTokenCache{tokens: make(map[string]int64), expireTime: 300}
@@ -217,23 +209,23 @@ func MakeRESTRouter(
ph := rest.Handle(`/{hash:[0-9a-f]{32}}`, PutBlockHandler{kc, t})
if enable_get {
- gh.Methods("GET")
- ghsig.Methods("GET")
+ gh.Methods("GET", "HEAD")
+ ghsig.Methods("GET", "HEAD")
}
if enable_put {
ph.Methods("PUT")
}
- if enable_head {
- gh.Methods("HEAD")
- ghsig.Methods("HEAD")
- }
-
return rest
}
func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+
+ if !CheckAuthorizationHeader(this.KeepClient, this.ApiTokenCache, req) {
+ http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
+ }
+
hash := mux.Vars(req)["hash"]
signature := mux.Vars(req)["signature"]
timestamp := mux.Vars(req)["timestamp"]
@@ -264,6 +256,9 @@ func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
}
func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+
+ log.Print("PutBlockHandler start")
+
if !CheckAuthorizationHeader(this.KeepClient, this.ApiTokenCache, req) {
http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
}
diff --git a/services/keep/src/arvados.org/keepproxy/keepproxy_test.go b/services/keep/src/arvados.org/keepproxy/keepproxy_test.go
index 9871281..af1377b 100644
--- a/services/keep/src/arvados.org/keepproxy/keepproxy_test.go
+++ b/services/keep/src/arvados.org/keepproxy/keepproxy_test.go
@@ -47,9 +47,15 @@ func (s *ServerRequiredSuite) SetUpSuite(c *C) {
os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
SetupProxyService()
+
+ os.Args = []string{"keepproxy", "-listen=:29950"}
+ go main()
+ time.Sleep(100 * time.Millisecond)
}
func (s *ServerRequiredSuite) TearDownSuite(c *C) {
+ listener.Close()
+
cwd, _ := os.Getwd()
defer os.Chdir(cwd)
@@ -110,21 +116,13 @@ func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
log.Print("keepclient created")
- os.Args = []string{"keepproxy", "-listen=:29950"}
- go main()
-
- time.Sleep(100 * time.Millisecond)
-
- log.Print("keepproxy main started")
-
hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
- // Uncomment this when actual keep server supports HEAD
- /*{
+ {
_, _, err := kc.Ask(hash)
c.Check(err, Equals, keepclient.BlockNotFound)
log.Print("Ask 1")
- }*/
+ }
{
hash2, rep, err := kc.PutB([]byte("foo"))
@@ -134,25 +132,69 @@ func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
log.Print("PutB")
}
- // Uncomment this when actual keep server supports HEAD
- /*{
+ {
blocklen, _, err := kc.Ask(hash)
+ c.Assert(err, Equals, nil)
c.Check(blocklen, Equals, int64(3))
- c.Check(err, Equals, nil)
log.Print("Ask 2")
- }*/
+ }
{
reader, blocklen, _, err := kc.Get(hash)
+ c.Assert(err, Equals, nil)
all, err := ioutil.ReadAll(reader)
c.Check(all, DeepEquals, []byte("foo"))
c.Check(blocklen, Equals, int64(3))
- c.Check(err, Equals, nil)
log.Print("Get")
}
- // Close internal listener socket.
- listener.Close()
-
log.Print("TestPutAndGet done")
}
+
+func (s *ServerRequiredSuite) TestPutAskGetForbidden(c *C) {
+ log.Print("TestPutAndGet start")
+
+ os.Setenv("ARVADOS_EXTERNAL_CLIENT", "true")
+ kc, err := keepclient.MakeKeepClient()
+ kc.ApiToken = "123xyz"
+ c.Check(kc.External, Equals, true)
+ c.Check(kc.Using_proxy, Equals, true)
+ c.Check(len(kc.Service_roots), Equals, 1)
+ c.Check(kc.Service_roots[0], Equals, "http://localhost:29950")
+ c.Check(err, Equals, nil)
+ os.Setenv("ARVADOS_EXTERNAL_CLIENT", "")
+
+ log.Print("keepclient created")
+
+ hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+
+ {
+ _, _, err := kc.Ask(hash)
+ c.Check(err, Equals, keepclient.BlockNotFound)
+ log.Print("Ask 1")
+ }
+
+ {
+ hash2, rep, err := kc.PutB([]byte("foo"))
+ c.Check(hash2, Equals, hash)
+ c.Check(rep, Equals, 0)
+ c.Check(err, Equals, keepclient.InsufficientReplicasError)
+ log.Print("PutB")
+ }
+
+ {
+ blocklen, _, err := kc.Ask(hash)
+ c.Assert(err, Equals, keepclient.BlockNotFound)
+ c.Check(blocklen, Equals, int64(0))
+ log.Print("Ask 2")
+ }
+
+ {
+ _, blocklen, _, err := kc.Get(hash)
+ c.Assert(err, Equals, keepclient.BlockNotFound)
+ c.Check(blocklen, Equals, int64(0))
+ log.Print("Get")
+ }
+
+ log.Print("TestPutAndGetForbidden done")
+}
commit 14eddaec927be590682e226b9998c3e616c0b72e
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Thu May 22 11:17:32 2014 -0400
1885: Full-stack integration test (api+keep+keepproxy+keepclient) works!
diff --git a/sdk/go/src/arvados.org/keepclient/support.go b/sdk/go/src/arvados.org/keepclient/support.go
index 89c5e4b..ef4a8e1 100644
--- a/sdk/go/src/arvados.org/keepclient/support.go
+++ b/sdk/go/src/arvados.org/keepclient/support.go
@@ -186,6 +186,7 @@ func (this KeepClient) uploadToKeepServer(host string, hash string, body io.Read
var resp *http.Response
if resp, err = this.Client.Do(req); err != nil {
upload_status <- uploadStatus{err, url, 0, 0}
+ body.Close()
return
}
@@ -231,7 +232,6 @@ func (this KeepClient) putReplicas(
next_server += 1
active += 1
} else {
- fmt.Print(active)
if active == 0 {
return (this.Want_replicas - remaining_replicas), InsufficientReplicasError
} else {
@@ -251,7 +251,7 @@ func (this KeepClient) putReplicas(
status.url, status.err)
}
active -= 1
- log.Printf("Upload status %v %v %v", status.statusCode, remaining_replicas, active)
+ log.Printf("Upload status code: %v remaining replicas: %v active: %v", status.statusCode, remaining_replicas, active)
}
return this.Want_replicas, nil
diff --git a/services/keep/src/arvados.org/keepproxy/keepproxy.go b/services/keep/src/arvados.org/keepproxy/keepproxy.go
index 4213ce0..8b8ff18 100644
--- a/services/keep/src/arvados.org/keepproxy/keepproxy.go
+++ b/services/keep/src/arvados.org/keepproxy/keepproxy.go
@@ -240,25 +240,27 @@ func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
var reader io.ReadCloser
var err error
+ var blocklen int64
if req.Method == "GET" {
- reader, _, _, err = this.KeepClient.AuthorizedGet(hash, signature, timestamp)
+ reader, blocklen, _, err = this.KeepClient.AuthorizedGet(hash, signature, timestamp)
+ defer reader.Close()
} else if req.Method == "HEAD" {
- _, _, err = this.KeepClient.AuthorizedAsk(hash, signature, timestamp)
+ blocklen, _, err = this.KeepClient.AuthorizedAsk(hash, signature, timestamp)
}
+ resp.Header().Set("Content-Length", fmt.Sprint(blocklen))
+
switch err {
case nil:
- io.Copy(resp, reader)
+ if reader != nil {
+ io.Copy(resp, reader)
+ }
case keepclient.BlockNotFound:
http.Error(resp, "Not found", http.StatusNotFound)
default:
http.Error(resp, err.Error(), http.StatusBadGateway)
}
-
- if reader != nil {
- reader.Close()
- }
}
func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
@@ -294,6 +296,8 @@ func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
// Now try to put the block through
replicas, err := this.KeepClient.PutHR(hash, req.Body, contentLength)
+ log.Printf("Replicas stored: %v err: %v", replicas, err)
+
// Tell the client how many successful PUTs we accomplished
resp.Header().Set("X-Keep-Replicas-Stored", fmt.Sprintf("%d", replicas))
diff --git a/services/keep/src/arvados.org/keepproxy/keepproxy_test.go b/services/keep/src/arvados.org/keepproxy/keepproxy_test.go
index 256f7f4..9871281 100644
--- a/services/keep/src/arvados.org/keepproxy/keepproxy_test.go
+++ b/services/keep/src/arvados.org/keepproxy/keepproxy_test.go
@@ -2,6 +2,7 @@ package main
import (
"arvados.org/keepclient"
+ "crypto/md5"
"crypto/tls"
"fmt"
. "gopkg.in/check.v1"
@@ -14,6 +15,7 @@ import (
"os/exec"
"strings"
"testing"
+ "time"
)
// Gocheck boilerplate
@@ -94,7 +96,7 @@ func SetupProxyService() {
}
}
-func (s *ServerRequiredSuite) TestPutAndGet(c *C) {
+func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
log.Print("TestPutAndGet start")
os.Setenv("ARVADOS_EXTERNAL_CLIENT", "true")
@@ -111,22 +113,43 @@ func (s *ServerRequiredSuite) TestPutAndGet(c *C) {
os.Args = []string{"keepproxy", "-listen=:29950"}
go main()
- log.Print("keepproxy main started")
-
- hash, rep, err2 := kc.PutB([]byte("foo"))
-
- log.Print("PutB")
+ time.Sleep(100 * time.Millisecond)
- c.Check(rep, Equals, 2)
- c.Check(err2, Equals, nil)
+ log.Print("keepproxy main started")
- reader, blocklen, _, err3 := kc.Get(hash)
- all, err := ioutil.ReadAll(reader)
- c.Check(all, DeepEquals, []byte("foo"))
- c.Check(blocklen, Equals, int64(3))
- c.Check(err3, Equals, nil)
+ hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+
+ // Uncomment this when actual keep server supports HEAD
+ /*{
+ _, _, err := kc.Ask(hash)
+ c.Check(err, Equals, keepclient.BlockNotFound)
+ log.Print("Ask 1")
+ }*/
+
+ {
+ hash2, rep, err := kc.PutB([]byte("foo"))
+ c.Check(hash2, Equals, hash)
+ c.Check(rep, Equals, 2)
+ c.Check(err, Equals, nil)
+ log.Print("PutB")
+ }
- log.Print("Get")
+ // Uncomment this when actual keep server supports HEAD
+ /*{
+ blocklen, _, err := kc.Ask(hash)
+ c.Check(blocklen, Equals, int64(3))
+ c.Check(err, Equals, nil)
+ log.Print("Ask 2")
+ }*/
+
+ {
+ reader, blocklen, _, err := kc.Get(hash)
+ all, err := ioutil.ReadAll(reader)
+ c.Check(all, DeepEquals, []byte("foo"))
+ c.Check(blocklen, Equals, int64(3))
+ c.Check(err, Equals, nil)
+ log.Print("Get")
+ }
// Close internal listener socket.
listener.Close()
commit 216733543e5348ef8ef25ee5edbe6ed2dd35f5d3
Merge: 34ab7b2 c2e70e0
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Thu May 22 10:45:33 2014 -0400
Merge remote-tracking branch 'origin/master' into 1885-keep-proxy refs #1885
commit 34ab7b208c7eb14ccae1e31831286acf9ea29487
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Thu May 22 10:45:18 2014 -0400
1885: Integration test of proxy work in progress.
diff --git a/sdk/go/go.sh b/sdk/go/go.sh
index 0203a1c..89f81fb 100755
--- a/sdk/go/go.sh
+++ b/sdk/go/go.sh
@@ -1,11 +1,11 @@
#! /bin/sh
rootdir=$(dirname $0)
-GOPATH=$rootdir:$rootdir/../../sdk/go:$GOPATH
+GOPATH=$rootdir:$GOPATH
export GOPATH
-mkdir -p pkg
-mkdir -p bin
+mkdir -p $rootdir/pkg
+mkdir -p $rootdir/bin
go get gopkg.in/check.v1
diff --git a/sdk/go/src/arvados.org/keepclient/keepclient.go b/sdk/go/src/arvados.org/keepclient/keepclient.go
index e16c853..91989bd 100644
--- a/sdk/go/src/arvados.org/keepclient/keepclient.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient.go
@@ -29,6 +29,7 @@ type KeepClient struct {
Want_replicas int
Client *http.Client
Using_proxy bool
+ External bool
}
// Create a new KeepClient, initialized with standard Arvados environment
@@ -37,6 +38,7 @@ type KeepClient struct {
// Keep servers.
func MakeKeepClient() (kc KeepClient, err error) {
insecure := (os.Getenv("ARVADOS_API_HOST_INSECURE") == "true")
+ external := (os.Getenv("ARVADOS_EXTERNAL_CLIENT") == "true")
kc = KeepClient{
ApiServer: os.Getenv("ARVADOS_API_HOST"),
@@ -45,7 +47,8 @@ func MakeKeepClient() (kc KeepClient, err error) {
Want_replicas: 2,
Client: &http.Client{Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: insecure}}},
- Using_proxy: false}
+ Using_proxy: false,
+ External: external}
err = (&kc).discoverKeepServers()
diff --git a/sdk/go/src/arvados.org/keepclient/keepclient_test.go b/sdk/go/src/arvados.org/keepclient/keepclient_test.go
index 1ef5fd6..c67e09f 100644
--- a/sdk/go/src/arvados.org/keepclient/keepclient_test.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient_test.go
@@ -111,17 +111,15 @@ func (this StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request
}
func RunBogusKeepServer(st http.Handler, port int) (listener net.Listener, url string) {
- server := http.Server{Handler: st}
-
var err error
listener, err = net.ListenTCP("tcp", &net.TCPAddr{Port: port})
if err != nil {
panic(fmt.Sprintf("Could not listen on tcp port %v", port))
}
- url = fmt.Sprintf("http://localhost:%d", listener.Addr().(*net.TCPAddr).Port)
+ url = fmt.Sprintf("http://localhost:%d", port)
- go server.Serve(listener)
+ go http.Serve(listener, st)
return listener, url
}
diff --git a/sdk/go/src/arvados.org/keepclient/support.go b/sdk/go/src/arvados.org/keepclient/support.go
index d0ea967..89c5e4b 100644
--- a/sdk/go/src/arvados.org/keepclient/support.go
+++ b/sdk/go/src/arvados.org/keepclient/support.go
@@ -33,12 +33,15 @@ func (this *KeepClient) discoverKeepServers() error {
var req *http.Request
var err error
- if req, err = http.NewRequest("GET", fmt.Sprintf("https://%s/arvados/v1/keep_services/accessible", this.ApiServer), nil); err != nil {
+ if req, err = http.NewRequest("GET", fmt.Sprintf("https://%s/arvados/v1/keep_services/accessible?format=json", this.ApiServer), nil); err != nil {
return err
}
// Add api token header
req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
+ if this.External {
+ req.Header.Add("X-External-Client", "1")
+ }
// Make the request
var resp *http.Response
diff --git a/services/keep/go.sh b/services/keep/go.sh
index fa6b5f6..177b27d 100755
--- a/services/keep/go.sh
+++ b/services/keep/go.sh
@@ -4,8 +4,8 @@ rootdir=$(dirname $0)
GOPATH=$rootdir:$rootdir/../../sdk/go:$GOPATH
export GOPATH
-mkdir -p pkg
-mkdir -p bin
+mkdir -p $rootdir/pkg
+mkdir -p $rootdir/bin
go get github.com/gorilla/mux
diff --git a/services/keep/src/arvados.org/keepproxy/keepproxy.go b/services/keep/src/arvados.org/keepproxy/keepproxy.go
index c118f22..4213ce0 100644
--- a/services/keep/src/arvados.org/keepproxy/keepproxy.go
+++ b/services/keep/src/arvados.org/keepproxy/keepproxy.go
@@ -7,15 +7,19 @@ import (
"github.com/gorilla/mux"
"io"
"log"
+ "net"
"net/http"
+ "os"
"sync"
"time"
)
// Default TCP address on which to listen for requests.
-// Initialized by the --listen flag.
+// Initialized by the -listen flag.
const DEFAULT_ADDR = ":25107"
+var listener net.Listener
+
func main() {
var (
listen string
@@ -23,6 +27,7 @@ func main() {
no_put bool
no_head bool
default_replicas int
+ pidfile string
)
flag.StringVar(
@@ -32,23 +37,24 @@ func main() {
"Interface on which to listen for requests, in the format "+
"ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port "+
"to listen on all network interfaces.")
+
flag.BoolVar(
&no_get,
"no-get",
true,
- "If true, disable GET operations")
+ "If set, disable GET operations")
flag.BoolVar(
&no_get,
"no-put",
false,
- "If true, disable PUT operations")
+ "If set, disable PUT operations")
flag.BoolVar(
&no_head,
"no-head",
- false,
- "If true, disable HEAD operations")
+ true,
+ "If set, disable HEAD operations")
flag.IntVar(
&default_replicas,
@@ -56,12 +62,18 @@ func main() {
2,
"Default number of replicas to write if not specified by the client.")
+ flag.StringVar(
+ &pidfile,
+ "pid",
+ "",
+ "Path to write pid file")
+
flag.Parse()
- if no_get == false {
- log.Print("Must specify --no-get")
+ /*if no_get == false || no_head == false {
+ log.Print("Must specify -no-get and -no-head")
return
- }
+ }*/
kc, err := keepclient.MakeKeepClient()
if err != nil {
@@ -69,14 +81,26 @@ func main() {
return
}
+ if pidfile != "" {
+ f, err := os.Create(pidfile)
+ if err == nil {
+ fmt.Fprint(f, os.Getpid())
+ f.Close()
+ } else {
+ log.Printf("Error writing pid file (%s): %s", pidfile, err.Error())
+ }
+ }
+
kc.Want_replicas = default_replicas
- // Tell the built-in HTTP server to direct all requests to the REST
- // router.
- http.Handle("/", MakeRESTRouter(!no_get, !no_put, !no_head, kc))
+ listener, err = net.Listen("tcp", listen)
+ if err != nil {
+ log.Printf("Could not listen on %v", listen)
+ return
+ }
// Start listening for requests.
- http.ListenAndServe(listen, nil)
+ http.Serve(listener, MakeRESTRouter(!no_get, !no_put, !no_head, kc))
}
type ApiTokenCache struct {
diff --git a/services/keep/src/arvados.org/keepproxy/keepproxy_test.go b/services/keep/src/arvados.org/keepproxy/keepproxy_test.go
index d4f3ef4..256f7f4 100644
--- a/services/keep/src/arvados.org/keepproxy/keepproxy_test.go
+++ b/services/keep/src/arvados.org/keepproxy/keepproxy_test.go
@@ -2,8 +2,14 @@ package main
import (
"arvados.org/keepclient"
+ "crypto/tls"
"fmt"
. "gopkg.in/check.v1"
+ "io"
+ "io/ioutil"
+ "log"
+ "net/http"
+ "net/url"
"os"
"os/exec"
"strings"
@@ -23,21 +29,107 @@ type ServerRequiredSuite struct{}
func pythonDir() string {
gopath := os.Getenv("GOPATH")
- return fmt.Sprintf("%s/../python", strings.Split(gopath, ":")[0])
+ return fmt.Sprintf("%s/../../sdk/python", strings.Split(gopath, ":")[0])
}
func (s *ServerRequiredSuite) SetUpSuite(c *C) {
+ cwd, _ := os.Getwd()
+ defer os.Chdir(cwd)
+
os.Chdir(pythonDir())
exec.Command("python", "run_test_server.py", "start").Run()
exec.Command("python", "run_test_server.py", "start_keep").Run()
+
+ os.Setenv("ARVADOS_API_HOST", "localhost:3001")
+ os.Setenv("ARVADOS_API_TOKEN", "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
+ os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
+
+ SetupProxyService()
}
func (s *ServerRequiredSuite) TearDownSuite(c *C) {
+ cwd, _ := os.Getwd()
+ defer os.Chdir(cwd)
+
os.Chdir(pythonDir())
exec.Command("python", "run_test_server.py", "stop_keep").Run()
exec.Command("python", "run_test_server.py", "stop").Run()
}
+func SetupProxyService() {
+
+ client := &http.Client{Transport: &http.Transport{
+ TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}}
+
+ var req *http.Request
+ var err error
+ if req, err = http.NewRequest("POST", fmt.Sprintf("https://%s/arvados/v1/keep_services", os.Getenv("ARVADOS_API_HOST")), nil); err != nil {
+ panic(err.Error())
+ }
+ req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", os.Getenv("ARVADOS_API_TOKEN")))
+
+ reader, writer := io.Pipe()
+
+ req.Body = reader
+
+ go func() {
+ data := url.Values{}
+ data.Set("keep_service", `{
+ "service_host": "localhost",
+ "service_port": 29950,
+ "service_ssl_flag": false,
+ "service_type": "proxy"
+}`)
+
+ writer.Write([]byte(data.Encode()))
+ writer.Close()
+ }()
+
+ var resp *http.Response
+ if resp, err = client.Do(req); err != nil {
+ panic(err.Error())
+ }
+ if resp.StatusCode != 200 {
+ panic(resp.Status)
+ }
+}
+
func (s *ServerRequiredSuite) TestPutAndGet(c *C) {
- kc := keepclient.KeepClient{"localhost", "", true, 29950, nil, 2, nil, true}
+ log.Print("TestPutAndGet start")
+
+ os.Setenv("ARVADOS_EXTERNAL_CLIENT", "true")
+ kc, err := keepclient.MakeKeepClient()
+ c.Check(kc.External, Equals, true)
+ c.Check(kc.Using_proxy, Equals, true)
+ c.Check(len(kc.Service_roots), Equals, 1)
+ c.Check(kc.Service_roots[0], Equals, "http://localhost:29950")
+ c.Check(err, Equals, nil)
+ os.Setenv("ARVADOS_EXTERNAL_CLIENT", "")
+
+ log.Print("keepclient created")
+
+ os.Args = []string{"keepproxy", "-listen=:29950"}
+ go main()
+
+ log.Print("keepproxy main started")
+
+ hash, rep, err2 := kc.PutB([]byte("foo"))
+
+ log.Print("PutB")
+
+ c.Check(rep, Equals, 2)
+ c.Check(err2, Equals, nil)
+
+ reader, blocklen, _, err3 := kc.Get(hash)
+ all, err := ioutil.ReadAll(reader)
+ c.Check(all, DeepEquals, []byte("foo"))
+ c.Check(blocklen, Equals, int64(3))
+ c.Check(err3, Equals, nil)
+
+ log.Print("Get")
+
+ // Close internal listener socket.
+ listener.Close()
+
+ log.Print("TestPutAndGet done")
}
commit 45d037864f655373dacd681d753ea3270f4ba997
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed May 21 16:32:23 2014 -0400
1885: Stubbed out integration test for proxy
diff --git a/services/keep/src/arvados.org/keepproxy/keepproxy_test.go b/services/keep/src/arvados.org/keepproxy/keepproxy_test.go
new file mode 100644
index 0000000..d4f3ef4
--- /dev/null
+++ b/services/keep/src/arvados.org/keepproxy/keepproxy_test.go
@@ -0,0 +1,43 @@
+package main
+
+import (
+ "arvados.org/keepclient"
+ "fmt"
+ . "gopkg.in/check.v1"
+ "os"
+ "os/exec"
+ "strings"
+ "testing"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+ TestingT(t)
+}
+
+// Gocheck boilerplate
+var _ = Suite(&ServerRequiredSuite{})
+
+// Tests that require the Keep server running
+type ServerRequiredSuite struct{}
+
+func pythonDir() string {
+ gopath := os.Getenv("GOPATH")
+ return fmt.Sprintf("%s/../python", strings.Split(gopath, ":")[0])
+}
+
+func (s *ServerRequiredSuite) SetUpSuite(c *C) {
+ os.Chdir(pythonDir())
+ exec.Command("python", "run_test_server.py", "start").Run()
+ exec.Command("python", "run_test_server.py", "start_keep").Run()
+}
+
+func (s *ServerRequiredSuite) TearDownSuite(c *C) {
+ os.Chdir(pythonDir())
+ exec.Command("python", "run_test_server.py", "stop_keep").Run()
+ exec.Command("python", "run_test_server.py", "stop").Run()
+}
+
+func (s *ServerRequiredSuite) TestPutAndGet(c *C) {
+ kc := keepclient.KeepClient{"localhost", "", true, 29950, nil, 2, nil, true}
+}
commit 744ca00ccd1e7fda9d4210b18c1f4a734f3b2261
Merge: cd85fc2 3ba5aa1
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed May 21 16:14:03 2014 -0400
Merge branch '2798-go-keep-client' into 1885-keep-proxy refs #1885
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list