[ARVADOS] created: 1.2.0-37-gf7fe4c8c4

Git user git at public.curoverse.com
Mon Sep 10 16:39:56 EDT 2018


        at  f7fe4c8c4ba93eb1746c5392a820af78c2ed2562 (commit)


commit f7fe4c8c4ba93eb1746c5392a820af78c2ed2562
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date:   Mon Sep 10 15:28:35 2018 -0400

    14087: Fetch federated collection by PDH
    
    Implement parallel query of remotes.  First response is returned to
    client, other responses in flight are cancelled.  Adds tests.
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>

diff --git a/lib/controller/federation.go b/lib/controller/federation.go
index 9db8270b4..286b4206c 100644
--- a/lib/controller/federation.go
+++ b/lib/controller/federation.go
@@ -7,14 +7,17 @@ package controller
 import (
 	"bufio"
 	"bytes"
+	"context"
 	"database/sql"
 	"encoding/json"
 	"fmt"
 	"io/ioutil"
+	"log"
 	"net/http"
 	"net/url"
 	"regexp"
 	"strings"
+	"sync"
 
 	"git.curoverse.com/arvados.git/sdk/go/arvados"
 	"git.curoverse.com/arvados.git/sdk/go/auth"
@@ -62,6 +65,7 @@ func (h *Handler) remoteClusterRequest(remoteID string, w http.ResponseWriter, r
 	if remote.Insecure {
 		client = h.insecureClient
 	}
+	log.Printf("Remote cluster request to %v %v", remoteID, urlOut)
 	h.proxy.Do(w, req, urlOut, client, filter)
 }
 
@@ -76,7 +80,11 @@ func (h *genericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *h
 
 type rewriteSignaturesClusterId string
 
-func (clusterId rewriteSignaturesClusterId) rewriteSignatures(resp *http.Response) (newResponse *http.Response, err error) {
+func (clusterId rewriteSignaturesClusterId) rewriteSignatures(resp *http.Response, requestError error) (newResponse *http.Response, err error) {
+	if requestError != nil {
+		return resp, requestError
+	}
+
 	if resp.StatusCode != 200 {
 		return resp, nil
 	}
@@ -134,22 +142,108 @@ func (clusterId rewriteSignaturesClusterId) rewriteSignatures(resp *http.Respons
 }
 
 type searchLocalClusterForPDH struct {
-	needSearchFederation bool
+	sentResponse bool
 }
 
-func (s *searchLocalClusterForPDH) filterLocalClusterResponse(resp *http.Response) (newResponse *http.Response, err error) {
+func (s *searchLocalClusterForPDH) filterLocalClusterResponse(resp *http.Response, requestError error) (newResponse *http.Response, err error) {
+	if requestError != nil {
+		return resp, requestError
+	}
+
 	if resp.StatusCode == 404 {
 		// Suppress returning this result, because we want to
 		// search the federation.
-		s.needSearchFederation = true
+		s.sentResponse = false
 		return nil, nil
 	}
+	s.sentResponse = true
 	return resp, nil
 }
 
+type searchRemoteClusterForPDH struct {
+	remoteID      string
+	mtx           *sync.Mutex
+	sentResponse  *bool
+	sharedContext *context.Context
+	cancelFunc    func()
+	errors        *[]string
+	statusCode    *int
+}
+
+func (s *searchRemoteClusterForPDH) filterRemoteClusterResponse(resp *http.Response, requestError error) (newResponse *http.Response, err error) {
+	s.mtx.Lock()
+	defer s.mtx.Unlock()
+
+	if *s.sentResponse {
+		// Another request already returned a response
+		return nil, nil
+	}
+
+	if requestError != nil {
+		*s.errors = append(*s.errors, fmt.Sprintf("Request error contacting %q: %v", s.remoteID, requestError))
+		// Record the error and suppress response
+		return nil, nil
+	}
+
+	if resp.StatusCode != 200 {
+		// Suppress returning unsuccessful result.  Maybe
+		// another request will find it.
+		// TODO collect and return error responses.
+		*s.errors = append(*s.errors, fmt.Sprintf("Response from %q: %v", s.remoteID, resp.Status))
+		if resp.StatusCode == 404 && (*s.statusCode == 0 || *s.statusCode == 404) {
+			// Only return 404 if every response is 404
+			*s.statusCode = http.StatusNotFound
+		} else {
+			// Got a non-404 error response, convert into BadGateway
+			*s.statusCode = http.StatusBadGateway
+		}
+		return nil, nil
+	}
+
+	s.mtx.Unlock()
+
+	// This reads the response body.  We don't want to hold the
+	// lock while doing this because other remote requests could
+	// also have made it to this point, and we want don't want a
+	// slow response holding the lock to block a faster response
+	// that is waiting on the lock.
+	newResponse, err = rewriteSignaturesClusterId(s.remoteID).rewriteSignatures(resp, nil)
+
+	s.mtx.Lock()
+
+	if *s.sentResponse {
+		// Another request already returned a response
+		return nil, nil
+	}
+
+	if err != nil {
+		// Suppress returning unsuccessful result.  Maybe
+		// another request will be successful.
+		*s.errors = append(*s.errors, fmt.Sprintf("Error parsing response from %q: %v", s.remoteID, err))
+		return nil, nil
+	}
+
+	// We have a successful response.  Suppress/cancel all the
+	// other requests/responses.
+	*s.sentResponse = true
+	s.cancelFunc()
+
+	return newResponse, nil
+}
+
 func (h *collectionFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
 	m := collectionByPDHRe.FindStringSubmatch(req.URL.Path)
 	if len(m) == 2 {
+		bearer := req.Header.Get("Authorization")
+		if strings.HasPrefix(bearer, "Bearer v2/") &&
+			len(bearer) > 10 &&
+			bearer[10:15] != h.handler.Cluster.ClusterID {
+			// Salted token from another cluster, just
+			// fall back to query local cluster only.
+			h.next.ServeHTTP(w, req)
+			return
+		}
+
 		urlOut, insecure, err := findRailsAPI(h.handler.Cluster, h.handler.NodeProfile)
 		if err != nil {
 			httpserver.Error(w, err.Error(), http.StatusInternalServerError)
@@ -167,12 +261,48 @@ func (h *collectionFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req
 		if insecure {
 			client = h.handler.insecureClient
 		}
-		sf := &searchLocalClusterForPDH{false}
+		sf := &searchLocalClusterForPDH{}
 		h.handler.proxy.Do(w, req, urlOut, client, sf.filterLocalClusterResponse)
-		if !sf.needSearchFederation {
-			// a response was sent
+		if sf.sentResponse {
+			// a response was sent, nothing more to do
 			return
 		}
+
+		sharedContext, cancelFunc := context.WithCancel(req.Context())
+		defer cancelFunc()
+		req = req.WithContext(sharedContext)
+
+		// Create a goroutine that will contact each cluster
+		// in the RemoteClusters map.  The first one to return
+		// a valid result gets returned to the client.  When
+		// that happens, all other outstanding requests are
+		// cancelled or suppressed.
+		sentResponse := false
+		mtx := sync.Mutex{}
+		wg := sync.WaitGroup{}
+		var errors []string
+		var errorCode int = 0
+		for remoteID := range h.handler.Cluster.RemoteClusters {
+			search := &searchRemoteClusterForPDH{remoteID, &mtx, &sentResponse,
+				&sharedContext, cancelFunc, &errors, &errorCode}
+			wg.Add(1)
+			go func() {
+				h.handler.remoteClusterRequest(search.remoteID, w, req, search.filterRemoteClusterResponse)
+				wg.Done()
+			}()
+		}
+		wg.Wait()
+		if sentResponse {
+			return
+		}
+
+		if errorCode == 0 {
+			errorCode = http.StatusBadGateway
+		}
+
+		// No successful responses, so return an error
+		httpserver.Errors(w, errors, errorCode)
+		return
 	}
 
 	m = collectionRe.FindStringSubmatch(req.URL.Path)
diff --git a/lib/controller/federation_test.go b/lib/controller/federation_test.go
index 12906e2de..338a81ae6 100644
--- a/lib/controller/federation_test.go
+++ b/lib/controller/federation_test.go
@@ -315,6 +315,13 @@ func (s *FederationSuite) TestGetRemoteCollection(c *check.C) {
 `)
 }
 
+func (s *FederationSuite) TestGetRemoteCollectionError(c *check.C) {
+	req := httptest.NewRequest("GET", "/arvados/v1/collections/zzzzz-4zz18-fakefakefakefak", nil)
+	req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+	resp := s.testRequest(req)
+	c.Check(resp.StatusCode, check.Equals, http.StatusNotFound)
+}
+
 func (s *FederationSuite) TestSignedLocatorPattern(c *check.C) {
 	// Confirm the regular expression identifies other groups of hints correctly
 	c.Check(keepclient.SignedLocatorRe.FindStringSubmatch(`6a4ff0499484c6c79c95cd8c566bd25f+249025+B1+C2+A05227438989d04712ea9ca1c91b556cef01d5cc7 at 5ba5405b+D3+E4`),
@@ -377,6 +384,71 @@ func (s *FederationSuite) TestGetRemoteCollectionByPDH(c *check.C) {
 	c.Check(json.NewDecoder(resp.Body).Decode(&col), check.IsNil)
 	c.Check(col.PortableDataHash, check.Equals, arvadostest.UserAgreementPDH)
 	c.Check(col.ManifestText, check.Matches,
+		`\. 6a4ff0499484c6c79c95cd8c566bd25f\+249025\+Rzzzzz-[0-9a-f]{40}@[0-9a-f]{8} 0:249025:GNU_General_Public_License,_version_3.pdf
+`)
+}
+
+func (s *FederationSuite) TestGetCollectionByPDHError(c *check.C) {
+	srv := &httpserver.Server{
+		Server: http.Server{
+			Handler: http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
+				w.WriteHeader(404)
+			}),
+		},
+	}
+
+	c.Assert(srv.Start(), check.IsNil)
+	defer srv.Close()
+
+	// Make the "local" cluster to a service that always returns 404
+	np := arvados.NodeProfile{
+		Controller: arvados.SystemServiceInstance{Listen: ":"},
+		RailsAPI: arvados.SystemServiceInstance{Listen: srv.Addr,
+			TLS: false, Insecure: true}}
+	s.testHandler.Cluster.NodeProfiles["*"] = np
+	s.testHandler.NodeProfile = &np
+
+	req := httptest.NewRequest("GET", "/arvados/v1/collections/99999999999999999999999999999999+99", nil)
+	req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+
+	resp := s.testRequest(req)
+	defer resp.Body.Close()
+
+	c.Check(resp.StatusCode, check.Equals, http.StatusBadGateway)
+}
+
+func (s *FederationSuite) TestSaltedTokenGetCollectionByPDH(c *check.C) {
+	np := arvados.NodeProfile{
+		Controller: arvados.SystemServiceInstance{Listen: ":"},
+		RailsAPI: arvados.SystemServiceInstance{Listen: os.Getenv("ARVADOS_TEST_API_HOST"),
+			TLS: true, Insecure: true}}
+	s.testHandler.Cluster.NodeProfiles["*"] = np
+	s.testHandler.NodeProfile = &np
+
+	req := httptest.NewRequest("GET", "/arvados/v1/collections/"+arvadostest.UserAgreementPDH, nil)
+	req.Header.Set("Authorization", "Bearer v2/zzzzz-gj3su-077z32aux8dg2s1/282d7d172b6cfdce364c5ed12ddf7417b2d00065")
+	resp := s.testRequest(req)
+
+	c.Check(resp.StatusCode, check.Equals, http.StatusOK)
+	var col arvados.Collection
+	c.Check(json.NewDecoder(resp.Body).Decode(&col), check.IsNil)
+	c.Check(col.PortableDataHash, check.Equals, arvadostest.UserAgreementPDH)
+	c.Check(col.ManifestText, check.Matches,
 		`\. 6a4ff0499484c6c79c95cd8c566bd25f\+249025\+A[0-9a-f]{40}@[0-9a-f]{8} 0:249025:GNU_General_Public_License,_version_3.pdf
 `)
 }
+
+func (s *FederationSuite) TestSaltedTokenGetCollectionByPDHError(c *check.C) {
+	np := arvados.NodeProfile{
+		Controller: arvados.SystemServiceInstance{Listen: ":"},
+		RailsAPI: arvados.SystemServiceInstance{Listen: os.Getenv("ARVADOS_TEST_API_HOST"),
+			TLS: true, Insecure: true}}
+	s.testHandler.Cluster.NodeProfiles["*"] = np
+	s.testHandler.NodeProfile = &np
+
+	req := httptest.NewRequest("GET", "/arvados/v1/collections/99999999999999999999999999999999+99", nil)
+	req.Header.Set("Authorization", "Bearer v2/zzzzz-gj3su-077z32aux8dg2s1/282d7d172b6cfdce364c5ed12ddf7417b2d00065")
+	resp := s.testRequest(req)
+
+	c.Check(resp.StatusCode, check.Equals, http.StatusNotFound)
+}
diff --git a/lib/controller/proxy.go b/lib/controller/proxy.go
index 2702c5607..72267b8d9 100644
--- a/lib/controller/proxy.go
+++ b/lib/controller/proxy.go
@@ -32,7 +32,7 @@ var dropHeaders = map[string]bool{
 	"Upgrade":           true,
 }
 
-type ResponseFilter func(*http.Response) (*http.Response, error)
+type ResponseFilter func(*http.Response, error) (*http.Response, error)
 
 func (p *proxy) Do(w http.ResponseWriter,
 	reqIn *http.Request,
@@ -74,17 +74,19 @@ func (p *proxy) Do(w http.ResponseWriter,
 	}).WithContext(ctx)
 
 	resp, err := client.Do(reqOut)
-	if err != nil {
+	if filter == nil && err != nil {
 		httpserver.Error(w, err.Error(), http.StatusBadGateway)
 		return
 	}
 
 	// make sure original response body gets closed
 	originalBody := resp.Body
-	defer originalBody.Close()
+	if originalBody != nil {
+		defer originalBody.Close()
+	}
 
 	if filter != nil {
-		resp, err = filter(resp)
+		resp, err = filter(resp, err)
 
 		if err != nil {
 			httpserver.Error(w, err.Error(), http.StatusBadGateway)
@@ -102,6 +104,7 @@ func (p *proxy) Do(w http.ResponseWriter,
 			defer resp.Body.Close()
 		}
 	}
+
 	for k, v := range resp.Header {
 		for _, v := range v {
 			w.Header().Add(k, v)
diff --git a/sdk/go/httpserver/error.go b/sdk/go/httpserver/error.go
index 398e61fcd..1ccf8c047 100644
--- a/sdk/go/httpserver/error.go
+++ b/sdk/go/httpserver/error.go
@@ -19,3 +19,10 @@ func Error(w http.ResponseWriter, error string, code int) {
 	w.WriteHeader(code)
 	json.NewEncoder(w).Encode(ErrorResponse{Errors: []string{error}})
 }
+
+func Errors(w http.ResponseWriter, errors []string, code int) {
+	w.Header().Set("Content-Type", "application/json")
+	w.Header().Set("X-Content-Type-Options", "nosniff")
+	w.WriteHeader(code)
+	json.NewEncoder(w).Encode(ErrorResponse{Errors: errors})
+}

commit ef8b1f40a3872e6f367ba3bd42a83347fa1e3043
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date:   Fri Sep 7 18:14:26 2018 -0400

    14087: Federated fetch by PDH WIP
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>

diff --git a/lib/controller/federation.go b/lib/controller/federation.go
index c9501d367..9db8270b4 100644
--- a/lib/controller/federation.go
+++ b/lib/controller/federation.go
@@ -24,6 +24,7 @@ import (
 
 var wfRe = regexp.MustCompile(`^/arvados/v1/workflows/([0-9a-z]{5})-[^/]+$`)
 var collectionRe = regexp.MustCompile(`^/arvados/v1/collections/([0-9a-z]{5})-[^/]+$`)
+var collectionByPDHRe = regexp.MustCompile(`^/arvados/v1/collections/([0-9a-fA-F]{32}\+[0-9]+)+$`)
 
 type genericFederatedRequestHandler struct {
 	next    http.Handler
@@ -132,8 +133,49 @@ func (clusterId rewriteSignaturesClusterId) rewriteSignatures(resp *http.Respons
 	return resp, nil
 }
 
+type searchLocalClusterForPDH struct {
+	needSearchFederation bool
+}
+
+func (s *searchLocalClusterForPDH) filterLocalClusterResponse(resp *http.Response) (newResponse *http.Response, err error) {
+	if resp.StatusCode == 404 {
+		// Suppress returning this result, because we want to
+		// search the federation.
+		s.needSearchFederation = true
+		return nil, nil
+	}
+	return resp, nil
+}
+
 func (h *collectionFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
-	m := collectionRe.FindStringSubmatch(req.URL.Path)
+	m := collectionByPDHRe.FindStringSubmatch(req.URL.Path)
+	if len(m) == 2 {
+		urlOut, insecure, err := findRailsAPI(h.handler.Cluster, h.handler.NodeProfile)
+		if err != nil {
+			httpserver.Error(w, err.Error(), http.StatusInternalServerError)
+			return
+		}
+
+		urlOut = &url.URL{
+			Scheme:   urlOut.Scheme,
+			Host:     urlOut.Host,
+			Path:     req.URL.Path,
+			RawPath:  req.URL.RawPath,
+			RawQuery: req.URL.RawQuery,
+		}
+		client := h.handler.secureClient
+		if insecure {
+			client = h.handler.insecureClient
+		}
+		sf := &searchLocalClusterForPDH{false}
+		h.handler.proxy.Do(w, req, urlOut, client, sf.filterLocalClusterResponse)
+		if !sf.needSearchFederation {
+			// a response was sent
+			return
+		}
+	}
+
+	m = collectionRe.FindStringSubmatch(req.URL.Path)
 	if len(m) < 2 || m[1] == h.handler.Cluster.ClusterID {
 		h.next.ServeHTTP(w, req)
 		return
diff --git a/lib/controller/federation_test.go b/lib/controller/federation_test.go
index f70a8981f..12906e2de 100644
--- a/lib/controller/federation_test.go
+++ b/lib/controller/federation_test.go
@@ -10,6 +10,7 @@ import (
 	"net/http"
 	"net/http/httptest"
 	"net/url"
+	"os"
 	"strings"
 	"time"
 
@@ -312,7 +313,9 @@ func (s *FederationSuite) TestGetRemoteCollection(c *check.C) {
 	c.Check(col.ManifestText, check.Matches,
 		`\. 6a4ff0499484c6c79c95cd8c566bd25f\+249025\+Rzzzzz-[0-9a-f]{40}@[0-9a-f]{8} 0:249025:GNU_General_Public_License,_version_3.pdf
 `)
+}
 
+func (s *FederationSuite) TestSignedLocatorPattern(c *check.C) {
 	// Confirm the regular expression identifies other groups of hints correctly
 	c.Check(keepclient.SignedLocatorRe.FindStringSubmatch(`6a4ff0499484c6c79c95cd8c566bd25f+249025+B1+C2+A05227438989d04712ea9ca1c91b556cef01d5cc7 at 5ba5405b+D3+E4`),
 		check.DeepEquals,
@@ -324,3 +327,56 @@ func (s *FederationSuite) TestGetRemoteCollection(c *check.C) {
 			"05227438989d04712ea9ca1c91b556cef01d5cc7", "5ba5405b",
 			"+D3+E4", "+E4"})
 }
+
+func (s *FederationSuite) TestGetLocalCollectionByPDH(c *check.C) {
+	np := arvados.NodeProfile{
+		Controller: arvados.SystemServiceInstance{Listen: ":"},
+		RailsAPI: arvados.SystemServiceInstance{Listen: os.Getenv("ARVADOS_TEST_API_HOST"),
+			TLS: true, Insecure: true}}
+	s.testHandler.Cluster.NodeProfiles["*"] = np
+	s.testHandler.NodeProfile = &np
+
+	req := httptest.NewRequest("GET", "/arvados/v1/collections/"+arvadostest.UserAgreementPDH, nil)
+	req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+	resp := s.testRequest(req)
+
+	c.Check(resp.StatusCode, check.Equals, http.StatusOK)
+	var col arvados.Collection
+	c.Check(json.NewDecoder(resp.Body).Decode(&col), check.IsNil)
+	c.Check(col.PortableDataHash, check.Equals, arvadostest.UserAgreementPDH)
+	c.Check(col.ManifestText, check.Matches,
+		`\. 6a4ff0499484c6c79c95cd8c566bd25f\+249025\+A[0-9a-f]{40}@[0-9a-f]{8} 0:249025:GNU_General_Public_License,_version_3.pdf
+`)
+}
+
+func (s *FederationSuite) TestGetRemoteCollectionByPDH(c *check.C) {
+	srv := &httpserver.Server{
+		Server: http.Server{
+			Handler: http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
+				w.WriteHeader(404)
+			}),
+		},
+	}
+
+	c.Assert(srv.Start(), check.IsNil)
+	defer srv.Close()
+
+	np := arvados.NodeProfile{
+		Controller: arvados.SystemServiceInstance{Listen: ":"},
+		RailsAPI: arvados.SystemServiceInstance{Listen: srv.Addr,
+			TLS: false, Insecure: true}}
+	s.testHandler.Cluster.NodeProfiles["*"] = np
+	s.testHandler.NodeProfile = &np
+
+	req := httptest.NewRequest("GET", "/arvados/v1/collections/"+arvadostest.UserAgreementPDH, nil)
+	req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+	resp := s.testRequest(req)
+
+	c.Check(resp.StatusCode, check.Equals, http.StatusOK)
+	var col arvados.Collection
+	c.Check(json.NewDecoder(resp.Body).Decode(&col), check.IsNil)
+	c.Check(col.PortableDataHash, check.Equals, arvadostest.UserAgreementPDH)
+	c.Check(col.ManifestText, check.Matches,
+		`\. 6a4ff0499484c6c79c95cd8c566bd25f\+249025\+A[0-9a-f]{40}@[0-9a-f]{8} 0:249025:GNU_General_Public_License,_version_3.pdf
+`)
+}
diff --git a/lib/controller/proxy.go b/lib/controller/proxy.go
index 8c643d752..2702c5607 100644
--- a/lib/controller/proxy.go
+++ b/lib/controller/proxy.go
@@ -72,6 +72,7 @@ func (p *proxy) Do(w http.ResponseWriter,
 		Header: hdrOut,
 		Body:   reqIn.Body,
 	}).WithContext(ctx)
+
 	resp, err := client.Do(reqOut)
 	if err != nil {
 		httpserver.Error(w, err.Error(), http.StatusBadGateway)

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list