[ARVADOS] created: 1.2.0-40-g55f6178b9
Git user
git at public.curoverse.com
Wed Sep 12 16:14:56 EDT 2018
at 55f6178b9a9a0d165e952eeec9a04d0234299397 (commit)
commit 55f6178b9a9a0d165e952eeec9a04d0234299397
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Tue Sep 11 12:55:05 2018 -0400
13994: Fix deadlock in keepstore tests.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/sdk/go/keepclient/discover.go b/sdk/go/keepclient/discover.go
index 2140dceab..6778b39fb 100644
--- a/sdk/go/keepclient/discover.go
+++ b/sdk/go/keepclient/discover.go
@@ -23,7 +23,10 @@ func RefreshServiceDiscovery() {
svcListCacheMtx.Lock()
defer svcListCacheMtx.Unlock()
for _, ent := range svcListCache {
- ent.clear <- struct{}{}
+ select {
+ case ent.clear <- struct{}{}:
+ default:
+ }
}
}
@@ -136,7 +139,7 @@ func (kc *KeepClient) discoverServices() error {
arv := *kc.Arvados
cacheEnt = cachedSvcList{
latest: make(chan svcList),
- clear: make(chan struct{}),
+ clear: make(chan struct{}, 1),
arv: &arv,
}
go cacheEnt.poll()
commit d6993a413a1290f110c52ed7339b09caf8b87b15
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Tue Sep 11 12:54:41 2018 -0400
13994: Proxy to remote cluster if +R hint given.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/sdk/go/arvadostest/fixtures.go b/sdk/go/arvadostest/fixtures.go
index 6a4b6232a..e6984601f 100644
--- a/sdk/go/arvadostest/fixtures.go
+++ b/sdk/go/arvadostest/fixtures.go
@@ -8,6 +8,7 @@ package arvadostest
const (
SpectatorToken = "zw2f4gwx8hw8cjre7yp6v1zylhrhn3m5gvjq73rtpwhmknrybu"
ActiveToken = "3kg6k6lzmp9kj5cpkcoxie963cmvjahbt2fod9zru30k1jqdmi"
+ ActiveTokenV2 = "v2/zzzzz-gj3su-077z32aux8dg2s1/3kg6k6lzmp9kj5cpkcoxie963cmvjahbt2fod9zru30k1jqdmi"
AdminToken = "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h"
AnonymousToken = "4kg6k6lzmp9kj4cpkcoxie964cmvjahbt4fod9zru44k4jqdmi"
DataManagerToken = "320mkve8qkswstz7ff61glpk3mhgghmg67wmic7elw4z41pke1"
diff --git a/sdk/go/arvadostest/integration_test_cluster.go b/sdk/go/arvadostest/integration_test_cluster.go
new file mode 100644
index 000000000..ac08ce184
--- /dev/null
+++ b/sdk/go/arvadostest/integration_test_cluster.go
@@ -0,0 +1,21 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package arvadostest
+
+import (
+ "os"
+ "path/filepath"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ check "gopkg.in/check.v1"
+)
+
+func IntegrationTestCluster(c *check.C) *arvados.Cluster {
+ config, err := arvados.GetConfig(filepath.Join(os.Getenv("WORKSPACE"), "tmp", "arvados.yml"))
+ c.Assert(err, check.IsNil)
+ cluster, err := config.GetCluster("")
+ c.Assert(err, check.IsNil)
+ return cluster
+}
diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index f012ea390..c37a4d112 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -30,6 +30,10 @@ import (
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
)
+var testCluster = &arvados.Cluster{
+ ClusterID: "zzzzz",
+}
+
// A RequestTester represents the parameters for an HTTP request to
// be issued on behalf of a unit test.
type RequestTester struct {
@@ -823,7 +827,7 @@ func IssueRequest(rt *RequestTester) *httptest.ResponseRecorder {
if rt.apiToken != "" {
req.Header.Set("Authorization", "OAuth2 "+rt.apiToken)
}
- loggingRouter := MakeRESTRouter()
+ loggingRouter := MakeRESTRouter(testCluster)
loggingRouter.ServeHTTP(response, req)
return response
}
@@ -835,7 +839,7 @@ func IssueHealthCheckRequest(rt *RequestTester) *httptest.ResponseRecorder {
if rt.apiToken != "" {
req.Header.Set("Authorization", "Bearer "+rt.apiToken)
}
- loggingRouter := MakeRESTRouter()
+ loggingRouter := MakeRESTRouter(testCluster)
loggingRouter.ServeHTTP(response, req)
return response
}
@@ -975,7 +979,7 @@ func TestGetHandlerClientDisconnect(t *testing.T) {
ok := make(chan struct{})
go func() {
req, _ := http.NewRequest("GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil)
- MakeRESTRouter().ServeHTTP(resp, req)
+ MakeRESTRouter(testCluster).ServeHTTP(resp, req)
ok <- struct{}{}
}()
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index c31ab9c2e..2426c9cbd 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -4,13 +4,6 @@
package main
-// REST handlers for Keep are implemented here.
-//
-// GetBlockHandler (GET /locator)
-// PutBlockHandler (PUT /locator)
-// IndexHandler (GET /index, GET /index/prefix)
-// StatusHandler (GET /status.json)
-
import (
"container/list"
"context"
@@ -29,27 +22,33 @@ import (
"github.com/gorilla/mux"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/health"
"git.curoverse.com/arvados.git/sdk/go/httpserver"
)
type router struct {
*mux.Router
- limiter httpserver.RequestCounter
+ limiter httpserver.RequestCounter
+ cluster *arvados.Cluster
+ remoteProxy remoteProxy
}
// MakeRESTRouter returns a new router that forwards all Keep requests
// to the appropriate handlers.
-func MakeRESTRouter() http.Handler {
- rtr := &router{Router: mux.NewRouter()}
+func MakeRESTRouter(cluster *arvados.Cluster) http.Handler {
+ rtr := &router{
+ Router: mux.NewRouter(),
+ cluster: cluster,
+ }
rtr.HandleFunc(
- `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
+ `/{hash:[0-9a-f]{32}}`, rtr.handleGET).Methods("GET", "HEAD")
rtr.HandleFunc(
`/{hash:[0-9a-f]{32}}+{hints}`,
- GetBlockHandler).Methods("GET", "HEAD")
+ rtr.handleGET).Methods("GET", "HEAD")
- rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
+ rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handlePUT).Methods("PUT")
rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
// List all blocks stored here. Privileged client only.
rtr.HandleFunc(`/index`, rtr.IndexHandler).Methods("GET", "HEAD")
@@ -98,11 +97,16 @@ func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
}
-// GetBlockHandler is a HandleFunc to address Get block requests.
-func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
+func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
ctx, cancel := contextForResponse(context.TODO(), resp)
defer cancel()
+ locator := req.URL.Path[1:]
+ if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") {
+ rtr.remoteProxy.Get(resp, req, rtr.cluster)
+ return
+ }
+
if theConfig.RequireSignatures {
locator := req.URL.Path[1:] // strip leading slash
if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
@@ -177,8 +181,7 @@ func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([
}
}
-// PutBlockHandler is a HandleFunc to address Put block requests.
-func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
+func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
ctx, cancel := contextForResponse(context.TODO(), resp)
defer cancel()
@@ -826,7 +829,7 @@ func IsValidLocator(loc string) bool {
return validLocatorRe.MatchString(loc)
}
-var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
+var authRe = regexp.MustCompile(`^(OAuth2|Bearer)\s+(.*)`)
// GetAPIToken returns the OAuth2 token from the Authorization
// header of a HTTP request, or an empty string if no matching
@@ -834,7 +837,7 @@ var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
func GetAPIToken(req *http.Request) string {
if auth, ok := req.Header["Authorization"]; ok {
if match := authRe.FindStringSubmatch(auth[0]); match != nil {
- return match[1]
+ return match[2]
}
}
return ""
diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index 79e3017d5..6ae414bf9 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -13,6 +13,7 @@ import (
"syscall"
"time"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/config"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
@@ -149,6 +150,22 @@ func main() {
}
}
+ var cluster *arvados.Cluster
+ cfg, err := arvados.GetConfig(arvados.DefaultConfigFile)
+ if err != nil && os.IsNotExist(err) {
+ log.Warnf("DEPRECATED: proceeding without cluster configuration file %q (%s)", arvados.DefaultConfigFile, err)
+ cluster = &arvados.Cluster{
+ ClusterID: "xxxxx",
+ }
+ } else if err != nil {
+ log.Fatalf("load config %q: %s", arvados.DefaultConfigFile, err)
+ } else {
+ cluster, err = cfg.GetCluster("")
+ if err != nil {
+ log.Fatalf("config error in %q: %s", arvados.DefaultConfigFile, err)
+ }
+ }
+
log.Println("keepstore starting, pid", os.Getpid())
defer log.Println("keepstore exiting, pid", os.Getpid())
@@ -156,7 +173,7 @@ func main() {
KeepVM = MakeRRVolumeManager(theConfig.Volumes)
// Middleware/handler stack
- router := MakeRESTRouter()
+ router := MakeRESTRouter(cluster)
// Set up a TCP listener.
listener, err := net.Listen("tcp", theConfig.Listen)
diff --git a/services/keepstore/mounts_test.go b/services/keepstore/mounts_test.go
index 0f7b6e973..9fa0090aa 100644
--- a/services/keepstore/mounts_test.go
+++ b/services/keepstore/mounts_test.go
@@ -28,7 +28,7 @@ func (s *MountsSuite) SetUpTest(c *check.C) {
theConfig = DefaultConfig()
theConfig.systemAuthToken = arvadostest.DataManagerToken
theConfig.Start()
- s.rtr = MakeRESTRouter()
+ s.rtr = MakeRESTRouter(testCluster)
}
func (s *MountsSuite) TearDownTest(c *check.C) {
diff --git a/services/keepstore/proxy_remote.go b/services/keepstore/proxy_remote.go
new file mode 100644
index 000000000..2e3d66351
--- /dev/null
+++ b/services/keepstore/proxy_remote.go
@@ -0,0 +1,113 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+ "io"
+ "net/http"
+ "strings"
+ "sync"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/auth"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+)
+
+type remoteProxy struct {
+ clients map[string]*keepclient.KeepClient
+ mtx sync.Mutex
+}
+
+func (rp *remoteProxy) Get(w http.ResponseWriter, r *http.Request, cluster *arvados.Cluster) {
+ var remoteClient *keepclient.KeepClient
+ var parts []string
+ for i, part := range strings.Split(r.URL.Path[1:], "+") {
+ switch {
+ case i == 0:
+ // don't try to parse hash part as hint
+ case strings.HasPrefix(part, "A"):
+ // drop local permission hint
+ continue
+ case len(part) > 7 && part[0] == 'R' && part[6] == '-':
+ remoteID := part[1:6]
+ remote, ok := cluster.RemoteClusters[remoteID]
+ if !ok {
+ http.Error(w, "remote cluster not configured", http.StatusBadGateway)
+ return
+ }
+ token := GetAPIToken(r)
+ if token == "" {
+ http.Error(w, "no token provided in Authorization header", http.StatusUnauthorized)
+ return
+ }
+ kc, err := rp.remoteClient(remoteID, remote, token)
+ if err == auth.ErrObsoleteToken {
+ http.Error(w, err.Error(), http.StatusBadRequest)
+ return
+ } else if err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ remoteClient = kc
+ part = "A" + part[7:]
+ }
+ parts = append(parts, part)
+ }
+ if remoteClient == nil {
+ http.Error(w, "bad request", http.StatusBadRequest)
+ return
+ }
+ locator := strings.Join(parts, "+")
+ rdr, _, _, err := remoteClient.Get(locator)
+ switch err.(type) {
+ case nil:
+ defer rdr.Close()
+ io.Copy(w, rdr)
+ case *keepclient.ErrNotFound:
+ http.Error(w, err.Error(), http.StatusNotFound)
+ default:
+ http.Error(w, err.Error(), http.StatusBadGateway)
+ }
+}
+
+func (rp *remoteProxy) remoteClient(remoteID string, remoteCluster arvados.RemoteCluster, token string) (*keepclient.KeepClient, error) {
+ rp.mtx.Lock()
+ kc, ok := rp.clients[remoteID]
+ rp.mtx.Unlock()
+ if !ok {
+ c := &arvados.Client{
+ APIHost: remoteCluster.Host,
+ AuthToken: "xxx",
+ Insecure: remoteCluster.Insecure,
+ }
+ ac, err := arvadosclient.New(c)
+ if err != nil {
+ return nil, err
+ }
+ kc, err = keepclient.MakeKeepClient(ac)
+ if err != nil {
+ return nil, err
+ }
+
+ rp.mtx.Lock()
+ if rp.clients == nil {
+ rp.clients = map[string]*keepclient.KeepClient{remoteID: kc}
+ } else {
+ rp.clients[remoteID] = kc
+ }
+ rp.mtx.Unlock()
+ }
+ accopy := *kc.Arvados
+ accopy.ApiToken = token
+ kccopy := *kc
+ kccopy.Arvados = &accopy
+ token, err := auth.SaltToken(token, remoteID)
+ if err != nil {
+ return nil, err
+ }
+ kccopy.Arvados.ApiToken = token
+ return &kccopy, nil
+}
diff --git a/services/keepstore/proxy_remote_test.go b/services/keepstore/proxy_remote_test.go
new file mode 100644
index 000000000..84c84d653
--- /dev/null
+++ b/services/keepstore/proxy_remote_test.go
@@ -0,0 +1,149 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+ "crypto/md5"
+ "encoding/json"
+ "fmt"
+ "net"
+ "net/http"
+ "net/http/httptest"
+ "strconv"
+ "strings"
+ "sync/atomic"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ "git.curoverse.com/arvados.git/sdk/go/auth"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+ check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&ProxyRemoteSuite{})
+
+type ProxyRemoteSuite struct {
+ cluster *arvados.Cluster
+ vm VolumeManager
+ rtr http.Handler
+
+ remoteClusterID string
+ remoteBlobSigningKey []byte
+ remoteKeepLocator string
+ remoteKeepData []byte
+ remoteKeepproxy *httptest.Server
+ remoteKeepRequests int64
+ remoteAPI *httptest.Server
+}
+
+func (s *ProxyRemoteSuite) remoteKeepproxyHandler(w http.ResponseWriter, r *http.Request) {
+ expectToken, err := auth.SaltToken(arvadostest.ActiveTokenV2, s.remoteClusterID)
+ if err != nil {
+ panic(err)
+ }
+ atomic.AddInt64(&s.remoteKeepRequests, 1)
+ var token string
+ if auth := strings.Split(r.Header.Get("Authorization"), " "); len(auth) == 2 && (auth[0] == "OAuth2" || auth[0] == "Bearer") {
+ token = auth[1]
+ }
+ if r.Method == "GET" && r.URL.Path == "/"+s.remoteKeepLocator && token == expectToken {
+ w.Write(s.remoteKeepData)
+ return
+ }
+ http.Error(w, "404", 404)
+}
+
+func (s *ProxyRemoteSuite) remoteAPIHandler(w http.ResponseWriter, r *http.Request) {
+ host, port, _ := net.SplitHostPort(strings.Split(s.remoteKeepproxy.URL, "//")[1])
+ portnum, _ := strconv.Atoi(port)
+ if r.URL.Path == "/arvados/v1/discovery/v1/rest" {
+ json.NewEncoder(w).Encode(arvados.DiscoveryDocument{})
+ return
+ }
+ if r.URL.Path == "/arvados/v1/keep_services/accessible" {
+ json.NewEncoder(w).Encode(arvados.KeepServiceList{
+ Items: []arvados.KeepService{
+ {
+ UUID: s.remoteClusterID + "-bi6l4-proxyproxyproxy",
+ ServiceType: "proxy",
+ ServiceHost: host,
+ ServicePort: portnum,
+ ServiceSSLFlag: false,
+ },
+ },
+ })
+ return
+ }
+ http.Error(w, "404", 404)
+}
+
+func (s *ProxyRemoteSuite) SetUpTest(c *check.C) {
+ s.remoteClusterID = "z0000"
+ s.remoteBlobSigningKey = []byte("3b6df6fb6518afe12922a5bc8e67bf180a358bc8")
+ s.remoteKeepproxy = httptest.NewServer(http.HandlerFunc(s.remoteKeepproxyHandler))
+ s.remoteAPI = httptest.NewUnstartedServer(http.HandlerFunc(s.remoteAPIHandler))
+ s.remoteAPI.StartTLS()
+ s.cluster = arvadostest.IntegrationTestCluster(c)
+ s.cluster.RemoteClusters = map[string]arvados.RemoteCluster{
+ s.remoteClusterID: arvados.RemoteCluster{
+ Host: strings.Split(s.remoteAPI.URL, "//")[1],
+ Proxy: true,
+ Scheme: "http",
+ Insecure: true,
+ },
+ }
+ s.vm = MakeTestVolumeManager(2)
+ KeepVM = s.vm
+ theConfig = DefaultConfig()
+ theConfig.systemAuthToken = arvadostest.DataManagerToken
+ theConfig.Start()
+ s.rtr = MakeRESTRouter(s.cluster)
+}
+
+func (s *ProxyRemoteSuite) TearDownTest(c *check.C) {
+ s.vm.Close()
+ KeepVM = nil
+ theConfig = DefaultConfig()
+ theConfig.Start()
+ s.remoteAPI.Close()
+ s.remoteKeepproxy.Close()
+}
+
+func (s *ProxyRemoteSuite) TestProxyRemote(c *check.C) {
+ data := []byte("foo bar")
+ s.remoteKeepData = data
+ locator := fmt.Sprintf("%x+%d", md5.Sum(data), len(data))
+ s.remoteKeepLocator = keepclient.SignLocator(locator, arvadostest.ActiveTokenV2, time.Now().Add(time.Minute), time.Minute, s.remoteBlobSigningKey)
+
+ path := "/" + strings.Replace(s.remoteKeepLocator, "+A", "+R"+s.remoteClusterID+"-", 1)
+
+ var req *http.Request
+ var resp *httptest.ResponseRecorder
+ tryWithToken := func(token string) {
+ req = httptest.NewRequest("GET", path, nil)
+ req.Header.Set("Authorization", "Bearer "+token)
+ resp = httptest.NewRecorder()
+ s.rtr.ServeHTTP(resp, req)
+ }
+
+ // Happy path
+ tryWithToken(arvadostest.ActiveTokenV2)
+ c.Check(s.remoteKeepRequests, check.Equals, int64(1))
+ c.Check(resp.Code, check.Equals, http.StatusOK)
+ c.Check(resp.Body.String(), check.Equals, string(data))
+
+ // Obsolete token
+ tryWithToken(arvadostest.ActiveToken)
+ c.Check(s.remoteKeepRequests, check.Equals, int64(1))
+ c.Check(resp.Code, check.Equals, http.StatusBadRequest)
+ c.Check(resp.Body.String(), check.Not(check.Equals), string(data))
+
+ // Bad token
+ tryWithToken(arvadostest.ActiveTokenV2[:len(arvadostest.ActiveTokenV2)-3] + "xxx")
+ c.Check(s.remoteKeepRequests, check.Equals, int64(2))
+ c.Check(resp.Code, check.Equals, http.StatusNotFound)
+ c.Check(resp.Body.String(), check.Not(check.Equals), string(data))
+}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list