[ARVADOS] created: 1.2.0-32-g25ab9165f

Git user git at public.curoverse.com
Wed Sep 12 16:08:19 EDT 2018


        at  25ab9165fc606298e9ee3fe5905740807d7d2e8c (commit)


commit 25ab9165fc606298e9ee3fe5905740807d7d2e8c
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Tue Sep 11 12:55:05 2018 -0400

    13994: Fix deadlock in keepstore tests.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/sdk/go/keepclient/discover.go b/sdk/go/keepclient/discover.go
index 2140dceab..6778b39fb 100644
--- a/sdk/go/keepclient/discover.go
+++ b/sdk/go/keepclient/discover.go
@@ -23,7 +23,10 @@ func RefreshServiceDiscovery() {
 	svcListCacheMtx.Lock()
 	defer svcListCacheMtx.Unlock()
 	for _, ent := range svcListCache {
-		ent.clear <- struct{}{}
+		select {
+		case ent.clear <- struct{}{}:
+		default:
+		}
 	}
 }
 
@@ -136,7 +139,7 @@ func (kc *KeepClient) discoverServices() error {
 		arv := *kc.Arvados
 		cacheEnt = cachedSvcList{
 			latest: make(chan svcList),
-			clear:  make(chan struct{}),
+			clear:  make(chan struct{}, 1),
 			arv:    &arv,
 		}
 		go cacheEnt.poll()

commit d9ef1b8a19fb2659f45b98b8963aca3fbab0e826
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Tue Sep 11 12:54:41 2018 -0400

    13994: Proxy to remote cluster if +R hint given.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/sdk/go/arvadostest/fixtures.go b/sdk/go/arvadostest/fixtures.go
index 6a4b6232a..e6984601f 100644
--- a/sdk/go/arvadostest/fixtures.go
+++ b/sdk/go/arvadostest/fixtures.go
@@ -8,6 +8,7 @@ package arvadostest
 const (
 	SpectatorToken          = "zw2f4gwx8hw8cjre7yp6v1zylhrhn3m5gvjq73rtpwhmknrybu"
 	ActiveToken             = "3kg6k6lzmp9kj5cpkcoxie963cmvjahbt2fod9zru30k1jqdmi"
+	ActiveTokenV2           = "v2/zzzzz-gj3su-077z32aux8dg2s1/3kg6k6lzmp9kj5cpkcoxie963cmvjahbt2fod9zru30k1jqdmi"
 	AdminToken              = "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h"
 	AnonymousToken          = "4kg6k6lzmp9kj4cpkcoxie964cmvjahbt4fod9zru44k4jqdmi"
 	DataManagerToken        = "320mkve8qkswstz7ff61glpk3mhgghmg67wmic7elw4z41pke1"
diff --git a/sdk/go/arvadostest/integration_test_cluster.go b/sdk/go/arvadostest/integration_test_cluster.go
new file mode 100644
index 000000000..ac08ce184
--- /dev/null
+++ b/sdk/go/arvadostest/integration_test_cluster.go
@@ -0,0 +1,21 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package arvadostest
+
+import (
+	"os"
+	"path/filepath"
+
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+	check "gopkg.in/check.v1"
+)
+
+func IntegrationTestCluster(c *check.C) *arvados.Cluster {
+	config, err := arvados.GetConfig(filepath.Join(os.Getenv("WORKSPACE"), "tmp", "arvados.yml"))
+	c.Assert(err, check.IsNil)
+	cluster, err := config.GetCluster("")
+	c.Assert(err, check.IsNil)
+	return cluster
+}
diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index f012ea390..c37a4d112 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -30,6 +30,10 @@ import (
 	"git.curoverse.com/arvados.git/sdk/go/arvadostest"
 )
 
+var testCluster = &arvados.Cluster{
+	ClusterID: "zzzzz",
+}
+
 // A RequestTester represents the parameters for an HTTP request to
 // be issued on behalf of a unit test.
 type RequestTester struct {
@@ -823,7 +827,7 @@ func IssueRequest(rt *RequestTester) *httptest.ResponseRecorder {
 	if rt.apiToken != "" {
 		req.Header.Set("Authorization", "OAuth2 "+rt.apiToken)
 	}
-	loggingRouter := MakeRESTRouter()
+	loggingRouter := MakeRESTRouter(testCluster)
 	loggingRouter.ServeHTTP(response, req)
 	return response
 }
@@ -835,7 +839,7 @@ func IssueHealthCheckRequest(rt *RequestTester) *httptest.ResponseRecorder {
 	if rt.apiToken != "" {
 		req.Header.Set("Authorization", "Bearer "+rt.apiToken)
 	}
-	loggingRouter := MakeRESTRouter()
+	loggingRouter := MakeRESTRouter(testCluster)
 	loggingRouter.ServeHTTP(response, req)
 	return response
 }
@@ -975,7 +979,7 @@ func TestGetHandlerClientDisconnect(t *testing.T) {
 	ok := make(chan struct{})
 	go func() {
 		req, _ := http.NewRequest("GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil)
-		MakeRESTRouter().ServeHTTP(resp, req)
+		MakeRESTRouter(testCluster).ServeHTTP(resp, req)
 		ok <- struct{}{}
 	}()
 
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index c31ab9c2e..2426c9cbd 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -4,13 +4,6 @@
 
 package main
 
-// REST handlers for Keep are implemented here.
-//
-// GetBlockHandler (GET /locator)
-// PutBlockHandler (PUT /locator)
-// IndexHandler    (GET /index, GET /index/prefix)
-// StatusHandler   (GET /status.json)
-
 import (
 	"container/list"
 	"context"
@@ -29,27 +22,33 @@ import (
 
 	"github.com/gorilla/mux"
 
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
 	"git.curoverse.com/arvados.git/sdk/go/health"
 	"git.curoverse.com/arvados.git/sdk/go/httpserver"
 )
 
 type router struct {
 	*mux.Router
-	limiter httpserver.RequestCounter
+	limiter     httpserver.RequestCounter
+	cluster     *arvados.Cluster
+	remoteProxy remoteProxy
 }
 
 // MakeRESTRouter returns a new router that forwards all Keep requests
 // to the appropriate handlers.
-func MakeRESTRouter() http.Handler {
-	rtr := &router{Router: mux.NewRouter()}
+func MakeRESTRouter(cluster *arvados.Cluster) http.Handler {
+	rtr := &router{
+		Router:  mux.NewRouter(),
+		cluster: cluster,
+	}
 
 	rtr.HandleFunc(
-		`/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
+		`/{hash:[0-9a-f]{32}}`, rtr.handleGET).Methods("GET", "HEAD")
 	rtr.HandleFunc(
 		`/{hash:[0-9a-f]{32}}+{hints}`,
-		GetBlockHandler).Methods("GET", "HEAD")
+		rtr.handleGET).Methods("GET", "HEAD")
 
-	rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
+	rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handlePUT).Methods("PUT")
 	rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
 	// List all blocks stored here. Privileged client only.
 	rtr.HandleFunc(`/index`, rtr.IndexHandler).Methods("GET", "HEAD")
@@ -98,11 +97,16 @@ func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
 	http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
 }
 
-// GetBlockHandler is a HandleFunc to address Get block requests.
-func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
+func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
 	ctx, cancel := contextForResponse(context.TODO(), resp)
 	defer cancel()
 
+	locator := req.URL.Path[1:]
+	if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") {
+		rtr.remoteProxy.Get(resp, req, rtr.cluster)
+		return
+	}
+
 	if theConfig.RequireSignatures {
 		locator := req.URL.Path[1:] // strip leading slash
 		if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
@@ -177,8 +181,7 @@ func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([
 	}
 }
 
-// PutBlockHandler is a HandleFunc to address Put block requests.
-func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
+func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
 	ctx, cancel := contextForResponse(context.TODO(), resp)
 	defer cancel()
 
@@ -826,7 +829,7 @@ func IsValidLocator(loc string) bool {
 	return validLocatorRe.MatchString(loc)
 }
 
-var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
+var authRe = regexp.MustCompile(`^(OAuth2|Bearer)\s+(.*)`)
 
 // GetAPIToken returns the OAuth2 token from the Authorization
 // header of a HTTP request, or an empty string if no matching
@@ -834,7 +837,7 @@ var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
 func GetAPIToken(req *http.Request) string {
 	if auth, ok := req.Header["Authorization"]; ok {
 		if match := authRe.FindStringSubmatch(auth[0]); match != nil {
-			return match[1]
+			return match[2]
 		}
 	}
 	return ""
diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index 79e3017d5..6ae414bf9 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -13,6 +13,7 @@ import (
 	"syscall"
 	"time"
 
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
 	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
 	"git.curoverse.com/arvados.git/sdk/go/config"
 	"git.curoverse.com/arvados.git/sdk/go/keepclient"
@@ -149,6 +150,22 @@ func main() {
 		}
 	}
 
+	var cluster *arvados.Cluster
+	cfg, err := arvados.GetConfig(arvados.DefaultConfigFile)
+	if err != nil && os.IsNotExist(err) {
+		log.Warnf("DEPRECATED: proceeding without cluster configuration file %q (%s)", arvados.DefaultConfigFile, err)
+		cluster = &arvados.Cluster{
+			ClusterID: "xxxxx",
+		}
+	} else if err != nil {
+		log.Fatalf("load config %q: %s", arvados.DefaultConfigFile, err)
+	} else {
+		cluster, err = cfg.GetCluster("")
+		if err != nil {
+			log.Fatalf("config error in %q: %s", arvados.DefaultConfigFile, err)
+		}
+	}
+
 	log.Println("keepstore starting, pid", os.Getpid())
 	defer log.Println("keepstore exiting, pid", os.Getpid())
 
@@ -156,7 +173,7 @@ func main() {
 	KeepVM = MakeRRVolumeManager(theConfig.Volumes)
 
 	// Middleware/handler stack
-	router := MakeRESTRouter()
+	router := MakeRESTRouter(cluster)
 
 	// Set up a TCP listener.
 	listener, err := net.Listen("tcp", theConfig.Listen)
diff --git a/services/keepstore/mounts_test.go b/services/keepstore/mounts_test.go
index 0f7b6e973..9fa0090aa 100644
--- a/services/keepstore/mounts_test.go
+++ b/services/keepstore/mounts_test.go
@@ -28,7 +28,7 @@ func (s *MountsSuite) SetUpTest(c *check.C) {
 	theConfig = DefaultConfig()
 	theConfig.systemAuthToken = arvadostest.DataManagerToken
 	theConfig.Start()
-	s.rtr = MakeRESTRouter()
+	s.rtr = MakeRESTRouter(testCluster)
 }
 
 func (s *MountsSuite) TearDownTest(c *check.C) {
diff --git a/services/keepstore/proxy_remote.go b/services/keepstore/proxy_remote.go
new file mode 100644
index 000000000..2e3d66351
--- /dev/null
+++ b/services/keepstore/proxy_remote.go
@@ -0,0 +1,113 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+	"io"
+	"net/http"
+	"strings"
+	"sync"
+
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+	"git.curoverse.com/arvados.git/sdk/go/auth"
+	"git.curoverse.com/arvados.git/sdk/go/keepclient"
+)
+
+type remoteProxy struct {
+	clients map[string]*keepclient.KeepClient
+	mtx     sync.Mutex
+}
+
+func (rp *remoteProxy) Get(w http.ResponseWriter, r *http.Request, cluster *arvados.Cluster) {
+	var remoteClient *keepclient.KeepClient
+	var parts []string
+	for i, part := range strings.Split(r.URL.Path[1:], "+") {
+		switch {
+		case i == 0:
+			// don't try to parse hash part as hint
+		case strings.HasPrefix(part, "A"):
+			// drop local permission hint
+			continue
+		case len(part) > 7 && part[0] == 'R' && part[6] == '-':
+			remoteID := part[1:6]
+			remote, ok := cluster.RemoteClusters[remoteID]
+			if !ok {
+				http.Error(w, "remote cluster not configured", http.StatusBadGateway)
+				return
+			}
+			token := GetAPIToken(r)
+			if token == "" {
+				http.Error(w, "no token provided in Authorization header", http.StatusUnauthorized)
+				return
+			}
+			kc, err := rp.remoteClient(remoteID, remote, token)
+			if err == auth.ErrObsoleteToken {
+				http.Error(w, err.Error(), http.StatusBadRequest)
+				return
+			} else if err != nil {
+				http.Error(w, err.Error(), http.StatusInternalServerError)
+				return
+			}
+			remoteClient = kc
+			part = "A" + part[7:]
+		}
+		parts = append(parts, part)
+	}
+	if remoteClient == nil {
+		http.Error(w, "bad request", http.StatusBadRequest)
+		return
+	}
+	locator := strings.Join(parts, "+")
+	rdr, _, _, err := remoteClient.Get(locator)
+	switch err.(type) {
+	case nil:
+		defer rdr.Close()
+		io.Copy(w, rdr)
+	case *keepclient.ErrNotFound:
+		http.Error(w, err.Error(), http.StatusNotFound)
+	default:
+		http.Error(w, err.Error(), http.StatusBadGateway)
+	}
+}
+
+func (rp *remoteProxy) remoteClient(remoteID string, remoteCluster arvados.RemoteCluster, token string) (*keepclient.KeepClient, error) {
+	rp.mtx.Lock()
+	kc, ok := rp.clients[remoteID]
+	rp.mtx.Unlock()
+	if !ok {
+		c := &arvados.Client{
+			APIHost:   remoteCluster.Host,
+			AuthToken: "xxx",
+			Insecure:  remoteCluster.Insecure,
+		}
+		ac, err := arvadosclient.New(c)
+		if err != nil {
+			return nil, err
+		}
+		kc, err = keepclient.MakeKeepClient(ac)
+		if err != nil {
+			return nil, err
+		}
+
+		rp.mtx.Lock()
+		if rp.clients == nil {
+			rp.clients = map[string]*keepclient.KeepClient{remoteID: kc}
+		} else {
+			rp.clients[remoteID] = kc
+		}
+		rp.mtx.Unlock()
+	}
+	accopy := *kc.Arvados
+	accopy.ApiToken = token
+	kccopy := *kc
+	kccopy.Arvados = &accopy
+	token, err := auth.SaltToken(token, remoteID)
+	if err != nil {
+		return nil, err
+	}
+	kccopy.Arvados.ApiToken = token
+	return &kccopy, nil
+}
diff --git a/services/keepstore/proxy_remote_test.go b/services/keepstore/proxy_remote_test.go
new file mode 100644
index 000000000..84c84d653
--- /dev/null
+++ b/services/keepstore/proxy_remote_test.go
@@ -0,0 +1,149 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+	"crypto/md5"
+	"encoding/json"
+	"fmt"
+	"net"
+	"net/http"
+	"net/http/httptest"
+	"strconv"
+	"strings"
+	"sync/atomic"
+	"time"
+
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+	"git.curoverse.com/arvados.git/sdk/go/arvadostest"
+	"git.curoverse.com/arvados.git/sdk/go/auth"
+	"git.curoverse.com/arvados.git/sdk/go/keepclient"
+	check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&ProxyRemoteSuite{})
+
+type ProxyRemoteSuite struct {
+	cluster *arvados.Cluster
+	vm      VolumeManager
+	rtr     http.Handler
+
+	remoteClusterID      string
+	remoteBlobSigningKey []byte
+	remoteKeepLocator    string
+	remoteKeepData       []byte
+	remoteKeepproxy      *httptest.Server
+	remoteKeepRequests   int64
+	remoteAPI            *httptest.Server
+}
+
+func (s *ProxyRemoteSuite) remoteKeepproxyHandler(w http.ResponseWriter, r *http.Request) {
+	expectToken, err := auth.SaltToken(arvadostest.ActiveTokenV2, s.remoteClusterID)
+	if err != nil {
+		panic(err)
+	}
+	atomic.AddInt64(&s.remoteKeepRequests, 1)
+	var token string
+	if auth := strings.Split(r.Header.Get("Authorization"), " "); len(auth) == 2 && (auth[0] == "OAuth2" || auth[0] == "Bearer") {
+		token = auth[1]
+	}
+	if r.Method == "GET" && r.URL.Path == "/"+s.remoteKeepLocator && token == expectToken {
+		w.Write(s.remoteKeepData)
+		return
+	}
+	http.Error(w, "404", 404)
+}
+
+func (s *ProxyRemoteSuite) remoteAPIHandler(w http.ResponseWriter, r *http.Request) {
+	host, port, _ := net.SplitHostPort(strings.Split(s.remoteKeepproxy.URL, "//")[1])
+	portnum, _ := strconv.Atoi(port)
+	if r.URL.Path == "/arvados/v1/discovery/v1/rest" {
+		json.NewEncoder(w).Encode(arvados.DiscoveryDocument{})
+		return
+	}
+	if r.URL.Path == "/arvados/v1/keep_services/accessible" {
+		json.NewEncoder(w).Encode(arvados.KeepServiceList{
+			Items: []arvados.KeepService{
+				{
+					UUID:           s.remoteClusterID + "-bi6l4-proxyproxyproxy",
+					ServiceType:    "proxy",
+					ServiceHost:    host,
+					ServicePort:    portnum,
+					ServiceSSLFlag: false,
+				},
+			},
+		})
+		return
+	}
+	http.Error(w, "404", 404)
+}
+
+func (s *ProxyRemoteSuite) SetUpTest(c *check.C) {
+	s.remoteClusterID = "z0000"
+	s.remoteBlobSigningKey = []byte("3b6df6fb6518afe12922a5bc8e67bf180a358bc8")
+	s.remoteKeepproxy = httptest.NewServer(http.HandlerFunc(s.remoteKeepproxyHandler))
+	s.remoteAPI = httptest.NewUnstartedServer(http.HandlerFunc(s.remoteAPIHandler))
+	s.remoteAPI.StartTLS()
+	s.cluster = arvadostest.IntegrationTestCluster(c)
+	s.cluster.RemoteClusters = map[string]arvados.RemoteCluster{
+		s.remoteClusterID: arvados.RemoteCluster{
+			Host:     strings.Split(s.remoteAPI.URL, "//")[1],
+			Proxy:    true,
+			Scheme:   "http",
+			Insecure: true,
+		},
+	}
+	s.vm = MakeTestVolumeManager(2)
+	KeepVM = s.vm
+	theConfig = DefaultConfig()
+	theConfig.systemAuthToken = arvadostest.DataManagerToken
+	theConfig.Start()
+	s.rtr = MakeRESTRouter(s.cluster)
+}
+
+func (s *ProxyRemoteSuite) TearDownTest(c *check.C) {
+	s.vm.Close()
+	KeepVM = nil
+	theConfig = DefaultConfig()
+	theConfig.Start()
+	s.remoteAPI.Close()
+	s.remoteKeepproxy.Close()
+}
+
+func (s *ProxyRemoteSuite) TestProxyRemote(c *check.C) {
+	data := []byte("foo bar")
+	s.remoteKeepData = data
+	locator := fmt.Sprintf("%x+%d", md5.Sum(data), len(data))
+	s.remoteKeepLocator = keepclient.SignLocator(locator, arvadostest.ActiveTokenV2, time.Now().Add(time.Minute), time.Minute, s.remoteBlobSigningKey)
+
+	path := "/" + strings.Replace(s.remoteKeepLocator, "+A", "+R"+s.remoteClusterID+"-", 1)
+
+	var req *http.Request
+	var resp *httptest.ResponseRecorder
+	tryWithToken := func(token string) {
+		req = httptest.NewRequest("GET", path, nil)
+		req.Header.Set("Authorization", "Bearer "+token)
+		resp = httptest.NewRecorder()
+		s.rtr.ServeHTTP(resp, req)
+	}
+
+	// Happy path
+	tryWithToken(arvadostest.ActiveTokenV2)
+	c.Check(s.remoteKeepRequests, check.Equals, int64(1))
+	c.Check(resp.Code, check.Equals, http.StatusOK)
+	c.Check(resp.Body.String(), check.Equals, string(data))
+
+	// Obsolete token
+	tryWithToken(arvadostest.ActiveToken)
+	c.Check(s.remoteKeepRequests, check.Equals, int64(1))
+	c.Check(resp.Code, check.Equals, http.StatusBadRequest)
+	c.Check(resp.Body.String(), check.Not(check.Equals), string(data))
+
+	// Bad token
+	tryWithToken(arvadostest.ActiveTokenV2[:len(arvadostest.ActiveTokenV2)-3] + "xxx")
+	c.Check(s.remoteKeepRequests, check.Equals, int64(2))
+	c.Check(resp.Code, check.Equals, http.StatusNotFound)
+	c.Check(resp.Body.String(), check.Not(check.Equals), string(data))
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list