[ARVADOS] created: 1.2.0-37-gf7fe4c8c4
Git user
git at public.curoverse.com
Mon Sep 10 16:39:56 EDT 2018
at f7fe4c8c4ba93eb1746c5392a820af78c2ed2562 (commit)
commit f7fe4c8c4ba93eb1746c5392a820af78c2ed2562
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Mon Sep 10 15:28:35 2018 -0400
14087: Fetch federated collection by PDH
Implement parallel query of remotes. First response is returned to
client, other responses in flight are cancelled. Adds tests.
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/lib/controller/federation.go b/lib/controller/federation.go
index 9db8270b4..286b4206c 100644
--- a/lib/controller/federation.go
+++ b/lib/controller/federation.go
@@ -7,14 +7,17 @@ package controller
import (
"bufio"
"bytes"
+ "context"
"database/sql"
"encoding/json"
"fmt"
"io/ioutil"
+ "log"
"net/http"
"net/url"
"regexp"
"strings"
+ "sync"
"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/auth"
@@ -62,6 +65,7 @@ func (h *Handler) remoteClusterRequest(remoteID string, w http.ResponseWriter, r
if remote.Insecure {
client = h.insecureClient
}
+ log.Printf("Remote cluster request to %v %v", remoteID, urlOut)
h.proxy.Do(w, req, urlOut, client, filter)
}
@@ -76,7 +80,11 @@ func (h *genericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *h
type rewriteSignaturesClusterId string
-func (clusterId rewriteSignaturesClusterId) rewriteSignatures(resp *http.Response) (newResponse *http.Response, err error) {
+func (clusterId rewriteSignaturesClusterId) rewriteSignatures(resp *http.Response, requestError error) (newResponse *http.Response, err error) {
+ if requestError != nil {
+ return resp, requestError
+ }
+
if resp.StatusCode != 200 {
return resp, nil
}
@@ -134,22 +142,108 @@ func (clusterId rewriteSignaturesClusterId) rewriteSignatures(resp *http.Respons
}
type searchLocalClusterForPDH struct {
- needSearchFederation bool
+ sentResponse bool
}
-func (s *searchLocalClusterForPDH) filterLocalClusterResponse(resp *http.Response) (newResponse *http.Response, err error) {
+func (s *searchLocalClusterForPDH) filterLocalClusterResponse(resp *http.Response, requestError error) (newResponse *http.Response, err error) {
+ if requestError != nil {
+ return resp, requestError
+ }
+
if resp.StatusCode == 404 {
// Suppress returning this result, because we want to
// search the federation.
- s.needSearchFederation = true
+ s.sentResponse = false
return nil, nil
}
+ s.sentResponse = true
return resp, nil
}
+type searchRemoteClusterForPDH struct {
+ remoteID string
+ mtx *sync.Mutex
+ sentResponse *bool
+ sharedContext *context.Context
+ cancelFunc func()
+ errors *[]string
+ statusCode *int
+}
+
+func (s *searchRemoteClusterForPDH) filterRemoteClusterResponse(resp *http.Response, requestError error) (newResponse *http.Response, err error) {
+ s.mtx.Lock()
+ defer s.mtx.Unlock()
+
+ if *s.sentResponse {
+ // Another request already returned a response
+ return nil, nil
+ }
+
+ if requestError != nil {
+ *s.errors = append(*s.errors, fmt.Sprintf("Request error contacting %q: %v", s.remoteID, requestError))
+ // Record the error and suppress response
+ return nil, nil
+ }
+
+ if resp.StatusCode != 200 {
+ // Suppress returning unsuccessful result. Maybe
+ // another request will find it.
+ // TODO collect and return error responses.
+ *s.errors = append(*s.errors, fmt.Sprintf("Response from %q: %v", s.remoteID, resp.Status))
+ if resp.StatusCode == 404 && (*s.statusCode == 0 || *s.statusCode == 404) {
+ // Only return 404 if every response is 404
+ *s.statusCode = http.StatusNotFound
+ } else {
+ // Got a non-404 error response, convert into BadGateway
+ *s.statusCode = http.StatusBadGateway
+ }
+ return nil, nil
+ }
+
+ s.mtx.Unlock()
+
+ // This reads the response body. We don't want to hold the
+ // lock while doing this because other remote requests could
+ // also have made it to this point, and we want don't want a
+ // slow response holding the lock to block a faster response
+ // that is waiting on the lock.
+ newResponse, err = rewriteSignaturesClusterId(s.remoteID).rewriteSignatures(resp, nil)
+
+ s.mtx.Lock()
+
+ if *s.sentResponse {
+ // Another request already returned a response
+ return nil, nil
+ }
+
+ if err != nil {
+ // Suppress returning unsuccessful result. Maybe
+ // another request will be successful.
+ *s.errors = append(*s.errors, fmt.Sprintf("Error parsing response from %q: %v", s.remoteID, err))
+ return nil, nil
+ }
+
+ // We have a successful response. Suppress/cancel all the
+ // other requests/responses.
+ *s.sentResponse = true
+ s.cancelFunc()
+
+ return newResponse, nil
+}
+
func (h *collectionFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
m := collectionByPDHRe.FindStringSubmatch(req.URL.Path)
if len(m) == 2 {
+ bearer := req.Header.Get("Authorization")
+ if strings.HasPrefix(bearer, "Bearer v2/") &&
+ len(bearer) > 10 &&
+ bearer[10:15] != h.handler.Cluster.ClusterID {
+ // Salted token from another cluster, just
+ // fall back to query local cluster only.
+ h.next.ServeHTTP(w, req)
+ return
+ }
+
urlOut, insecure, err := findRailsAPI(h.handler.Cluster, h.handler.NodeProfile)
if err != nil {
httpserver.Error(w, err.Error(), http.StatusInternalServerError)
@@ -167,12 +261,48 @@ func (h *collectionFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req
if insecure {
client = h.handler.insecureClient
}
- sf := &searchLocalClusterForPDH{false}
+ sf := &searchLocalClusterForPDH{}
h.handler.proxy.Do(w, req, urlOut, client, sf.filterLocalClusterResponse)
- if !sf.needSearchFederation {
- // a response was sent
+ if sf.sentResponse {
+ // a response was sent, nothing more to do
return
}
+
+ sharedContext, cancelFunc := context.WithCancel(req.Context())
+ defer cancelFunc()
+ req = req.WithContext(sharedContext)
+
+ // Create a goroutine that will contact each cluster
+ // in the RemoteClusters map. The first one to return
+ // a valid result gets returned to the client. When
+ // that happens, all other outstanding requests are
+ // cancelled or suppressed.
+ sentResponse := false
+ mtx := sync.Mutex{}
+ wg := sync.WaitGroup{}
+ var errors []string
+ var errorCode int = 0
+ for remoteID := range h.handler.Cluster.RemoteClusters {
+ search := &searchRemoteClusterForPDH{remoteID, &mtx, &sentResponse,
+ &sharedContext, cancelFunc, &errors, &errorCode}
+ wg.Add(1)
+ go func() {
+ h.handler.remoteClusterRequest(search.remoteID, w, req, search.filterRemoteClusterResponse)
+ wg.Done()
+ }()
+ }
+ wg.Wait()
+ if sentResponse {
+ return
+ }
+
+ if errorCode == 0 {
+ errorCode = http.StatusBadGateway
+ }
+
+ // No successful responses, so return an error
+ httpserver.Errors(w, errors, errorCode)
+ return
}
m = collectionRe.FindStringSubmatch(req.URL.Path)
diff --git a/lib/controller/federation_test.go b/lib/controller/federation_test.go
index 12906e2de..338a81ae6 100644
--- a/lib/controller/federation_test.go
+++ b/lib/controller/federation_test.go
@@ -315,6 +315,13 @@ func (s *FederationSuite) TestGetRemoteCollection(c *check.C) {
`)
}
+func (s *FederationSuite) TestGetRemoteCollectionError(c *check.C) {
+ req := httptest.NewRequest("GET", "/arvados/v1/collections/zzzzz-4zz18-fakefakefakefak", nil)
+ req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+ resp := s.testRequest(req)
+ c.Check(resp.StatusCode, check.Equals, http.StatusNotFound)
+}
+
func (s *FederationSuite) TestSignedLocatorPattern(c *check.C) {
// Confirm the regular expression identifies other groups of hints correctly
c.Check(keepclient.SignedLocatorRe.FindStringSubmatch(`6a4ff0499484c6c79c95cd8c566bd25f+249025+B1+C2+A05227438989d04712ea9ca1c91b556cef01d5cc7 at 5ba5405b+D3+E4`),
@@ -377,6 +384,71 @@ func (s *FederationSuite) TestGetRemoteCollectionByPDH(c *check.C) {
c.Check(json.NewDecoder(resp.Body).Decode(&col), check.IsNil)
c.Check(col.PortableDataHash, check.Equals, arvadostest.UserAgreementPDH)
c.Check(col.ManifestText, check.Matches,
+ `\. 6a4ff0499484c6c79c95cd8c566bd25f\+249025\+Rzzzzz-[0-9a-f]{40}@[0-9a-f]{8} 0:249025:GNU_General_Public_License,_version_3.pdf
+`)
+}
+
+func (s *FederationSuite) TestGetCollectionByPDHError(c *check.C) {
+ srv := &httpserver.Server{
+ Server: http.Server{
+ Handler: http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
+ w.WriteHeader(404)
+ }),
+ },
+ }
+
+ c.Assert(srv.Start(), check.IsNil)
+ defer srv.Close()
+
+ // Make the "local" cluster to a service that always returns 404
+ np := arvados.NodeProfile{
+ Controller: arvados.SystemServiceInstance{Listen: ":"},
+ RailsAPI: arvados.SystemServiceInstance{Listen: srv.Addr,
+ TLS: false, Insecure: true}}
+ s.testHandler.Cluster.NodeProfiles["*"] = np
+ s.testHandler.NodeProfile = &np
+
+ req := httptest.NewRequest("GET", "/arvados/v1/collections/99999999999999999999999999999999+99", nil)
+ req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+
+ resp := s.testRequest(req)
+ defer resp.Body.Close()
+
+ c.Check(resp.StatusCode, check.Equals, http.StatusBadGateway)
+}
+
+func (s *FederationSuite) TestSaltedTokenGetCollectionByPDH(c *check.C) {
+ np := arvados.NodeProfile{
+ Controller: arvados.SystemServiceInstance{Listen: ":"},
+ RailsAPI: arvados.SystemServiceInstance{Listen: os.Getenv("ARVADOS_TEST_API_HOST"),
+ TLS: true, Insecure: true}}
+ s.testHandler.Cluster.NodeProfiles["*"] = np
+ s.testHandler.NodeProfile = &np
+
+ req := httptest.NewRequest("GET", "/arvados/v1/collections/"+arvadostest.UserAgreementPDH, nil)
+ req.Header.Set("Authorization", "Bearer v2/zzzzz-gj3su-077z32aux8dg2s1/282d7d172b6cfdce364c5ed12ddf7417b2d00065")
+ resp := s.testRequest(req)
+
+ c.Check(resp.StatusCode, check.Equals, http.StatusOK)
+ var col arvados.Collection
+ c.Check(json.NewDecoder(resp.Body).Decode(&col), check.IsNil)
+ c.Check(col.PortableDataHash, check.Equals, arvadostest.UserAgreementPDH)
+ c.Check(col.ManifestText, check.Matches,
`\. 6a4ff0499484c6c79c95cd8c566bd25f\+249025\+A[0-9a-f]{40}@[0-9a-f]{8} 0:249025:GNU_General_Public_License,_version_3.pdf
`)
}
+
+func (s *FederationSuite) TestSaltedTokenGetCollectionByPDHError(c *check.C) {
+ np := arvados.NodeProfile{
+ Controller: arvados.SystemServiceInstance{Listen: ":"},
+ RailsAPI: arvados.SystemServiceInstance{Listen: os.Getenv("ARVADOS_TEST_API_HOST"),
+ TLS: true, Insecure: true}}
+ s.testHandler.Cluster.NodeProfiles["*"] = np
+ s.testHandler.NodeProfile = &np
+
+ req := httptest.NewRequest("GET", "/arvados/v1/collections/99999999999999999999999999999999+99", nil)
+ req.Header.Set("Authorization", "Bearer v2/zzzzz-gj3su-077z32aux8dg2s1/282d7d172b6cfdce364c5ed12ddf7417b2d00065")
+ resp := s.testRequest(req)
+
+ c.Check(resp.StatusCode, check.Equals, http.StatusNotFound)
+}
diff --git a/lib/controller/proxy.go b/lib/controller/proxy.go
index 2702c5607..72267b8d9 100644
--- a/lib/controller/proxy.go
+++ b/lib/controller/proxy.go
@@ -32,7 +32,7 @@ var dropHeaders = map[string]bool{
"Upgrade": true,
}
-type ResponseFilter func(*http.Response) (*http.Response, error)
+type ResponseFilter func(*http.Response, error) (*http.Response, error)
func (p *proxy) Do(w http.ResponseWriter,
reqIn *http.Request,
@@ -74,17 +74,19 @@ func (p *proxy) Do(w http.ResponseWriter,
}).WithContext(ctx)
resp, err := client.Do(reqOut)
- if err != nil {
+ if filter == nil && err != nil {
httpserver.Error(w, err.Error(), http.StatusBadGateway)
return
}
// make sure original response body gets closed
originalBody := resp.Body
- defer originalBody.Close()
+ if originalBody != nil {
+ defer originalBody.Close()
+ }
if filter != nil {
- resp, err = filter(resp)
+ resp, err = filter(resp, err)
if err != nil {
httpserver.Error(w, err.Error(), http.StatusBadGateway)
@@ -102,6 +104,7 @@ func (p *proxy) Do(w http.ResponseWriter,
defer resp.Body.Close()
}
}
+
for k, v := range resp.Header {
for _, v := range v {
w.Header().Add(k, v)
diff --git a/sdk/go/httpserver/error.go b/sdk/go/httpserver/error.go
index 398e61fcd..1ccf8c047 100644
--- a/sdk/go/httpserver/error.go
+++ b/sdk/go/httpserver/error.go
@@ -19,3 +19,10 @@ func Error(w http.ResponseWriter, error string, code int) {
w.WriteHeader(code)
json.NewEncoder(w).Encode(ErrorResponse{Errors: []string{error}})
}
+
+func Errors(w http.ResponseWriter, errors []string, code int) {
+ w.Header().Set("Content-Type", "application/json")
+ w.Header().Set("X-Content-Type-Options", "nosniff")
+ w.WriteHeader(code)
+ json.NewEncoder(w).Encode(ErrorResponse{Errors: errors})
+}
commit ef8b1f40a3872e6f367ba3bd42a83347fa1e3043
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Fri Sep 7 18:14:26 2018 -0400
14087: Federated fetch by PDH WIP
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/lib/controller/federation.go b/lib/controller/federation.go
index c9501d367..9db8270b4 100644
--- a/lib/controller/federation.go
+++ b/lib/controller/federation.go
@@ -24,6 +24,7 @@ import (
var wfRe = regexp.MustCompile(`^/arvados/v1/workflows/([0-9a-z]{5})-[^/]+$`)
var collectionRe = regexp.MustCompile(`^/arvados/v1/collections/([0-9a-z]{5})-[^/]+$`)
+var collectionByPDHRe = regexp.MustCompile(`^/arvados/v1/collections/([0-9a-fA-F]{32}\+[0-9]+)+$`)
type genericFederatedRequestHandler struct {
next http.Handler
@@ -132,8 +133,49 @@ func (clusterId rewriteSignaturesClusterId) rewriteSignatures(resp *http.Respons
return resp, nil
}
+type searchLocalClusterForPDH struct {
+ needSearchFederation bool
+}
+
+func (s *searchLocalClusterForPDH) filterLocalClusterResponse(resp *http.Response) (newResponse *http.Response, err error) {
+ if resp.StatusCode == 404 {
+ // Suppress returning this result, because we want to
+ // search the federation.
+ s.needSearchFederation = true
+ return nil, nil
+ }
+ return resp, nil
+}
+
func (h *collectionFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
- m := collectionRe.FindStringSubmatch(req.URL.Path)
+ m := collectionByPDHRe.FindStringSubmatch(req.URL.Path)
+ if len(m) == 2 {
+ urlOut, insecure, err := findRailsAPI(h.handler.Cluster, h.handler.NodeProfile)
+ if err != nil {
+ httpserver.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+
+ urlOut = &url.URL{
+ Scheme: urlOut.Scheme,
+ Host: urlOut.Host,
+ Path: req.URL.Path,
+ RawPath: req.URL.RawPath,
+ RawQuery: req.URL.RawQuery,
+ }
+ client := h.handler.secureClient
+ if insecure {
+ client = h.handler.insecureClient
+ }
+ sf := &searchLocalClusterForPDH{false}
+ h.handler.proxy.Do(w, req, urlOut, client, sf.filterLocalClusterResponse)
+ if !sf.needSearchFederation {
+ // a response was sent
+ return
+ }
+ }
+
+ m = collectionRe.FindStringSubmatch(req.URL.Path)
if len(m) < 2 || m[1] == h.handler.Cluster.ClusterID {
h.next.ServeHTTP(w, req)
return
diff --git a/lib/controller/federation_test.go b/lib/controller/federation_test.go
index f70a8981f..12906e2de 100644
--- a/lib/controller/federation_test.go
+++ b/lib/controller/federation_test.go
@@ -10,6 +10,7 @@ import (
"net/http"
"net/http/httptest"
"net/url"
+ "os"
"strings"
"time"
@@ -312,7 +313,9 @@ func (s *FederationSuite) TestGetRemoteCollection(c *check.C) {
c.Check(col.ManifestText, check.Matches,
`\. 6a4ff0499484c6c79c95cd8c566bd25f\+249025\+Rzzzzz-[0-9a-f]{40}@[0-9a-f]{8} 0:249025:GNU_General_Public_License,_version_3.pdf
`)
+}
+func (s *FederationSuite) TestSignedLocatorPattern(c *check.C) {
// Confirm the regular expression identifies other groups of hints correctly
c.Check(keepclient.SignedLocatorRe.FindStringSubmatch(`6a4ff0499484c6c79c95cd8c566bd25f+249025+B1+C2+A05227438989d04712ea9ca1c91b556cef01d5cc7 at 5ba5405b+D3+E4`),
check.DeepEquals,
@@ -324,3 +327,56 @@ func (s *FederationSuite) TestGetRemoteCollection(c *check.C) {
"05227438989d04712ea9ca1c91b556cef01d5cc7", "5ba5405b",
"+D3+E4", "+E4"})
}
+
+func (s *FederationSuite) TestGetLocalCollectionByPDH(c *check.C) {
+ np := arvados.NodeProfile{
+ Controller: arvados.SystemServiceInstance{Listen: ":"},
+ RailsAPI: arvados.SystemServiceInstance{Listen: os.Getenv("ARVADOS_TEST_API_HOST"),
+ TLS: true, Insecure: true}}
+ s.testHandler.Cluster.NodeProfiles["*"] = np
+ s.testHandler.NodeProfile = &np
+
+ req := httptest.NewRequest("GET", "/arvados/v1/collections/"+arvadostest.UserAgreementPDH, nil)
+ req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+ resp := s.testRequest(req)
+
+ c.Check(resp.StatusCode, check.Equals, http.StatusOK)
+ var col arvados.Collection
+ c.Check(json.NewDecoder(resp.Body).Decode(&col), check.IsNil)
+ c.Check(col.PortableDataHash, check.Equals, arvadostest.UserAgreementPDH)
+ c.Check(col.ManifestText, check.Matches,
+ `\. 6a4ff0499484c6c79c95cd8c566bd25f\+249025\+A[0-9a-f]{40}@[0-9a-f]{8} 0:249025:GNU_General_Public_License,_version_3.pdf
+`)
+}
+
+func (s *FederationSuite) TestGetRemoteCollectionByPDH(c *check.C) {
+ srv := &httpserver.Server{
+ Server: http.Server{
+ Handler: http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
+ w.WriteHeader(404)
+ }),
+ },
+ }
+
+ c.Assert(srv.Start(), check.IsNil)
+ defer srv.Close()
+
+ np := arvados.NodeProfile{
+ Controller: arvados.SystemServiceInstance{Listen: ":"},
+ RailsAPI: arvados.SystemServiceInstance{Listen: srv.Addr,
+ TLS: false, Insecure: true}}
+ s.testHandler.Cluster.NodeProfiles["*"] = np
+ s.testHandler.NodeProfile = &np
+
+ req := httptest.NewRequest("GET", "/arvados/v1/collections/"+arvadostest.UserAgreementPDH, nil)
+ req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+ resp := s.testRequest(req)
+
+ c.Check(resp.StatusCode, check.Equals, http.StatusOK)
+ var col arvados.Collection
+ c.Check(json.NewDecoder(resp.Body).Decode(&col), check.IsNil)
+ c.Check(col.PortableDataHash, check.Equals, arvadostest.UserAgreementPDH)
+ c.Check(col.ManifestText, check.Matches,
+ `\. 6a4ff0499484c6c79c95cd8c566bd25f\+249025\+A[0-9a-f]{40}@[0-9a-f]{8} 0:249025:GNU_General_Public_License,_version_3.pdf
+`)
+}
diff --git a/lib/controller/proxy.go b/lib/controller/proxy.go
index 8c643d752..2702c5607 100644
--- a/lib/controller/proxy.go
+++ b/lib/controller/proxy.go
@@ -72,6 +72,7 @@ func (p *proxy) Do(w http.ResponseWriter,
Header: hdrOut,
Body: reqIn.Body,
}).WithContext(ctx)
+
resp, err := client.Do(reqOut)
if err != nil {
httpserver.Error(w, err.Error(), http.StatusBadGateway)
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list