[ARVADOS] updated: 1.2.0-43-gcf1f57e51
Git user
git at public.curoverse.com
Fri Sep 14 11:28:40 EDT 2018
Summary of changes:
lib/controller/federation.go | 186 +++++++++++++++++++++----------------------
lib/controller/proxy.go | 1 +
2 files changed, 94 insertions(+), 93 deletions(-)
via cf1f57e51578de2ad9121e300f5b816b74938684 (commit)
from 3539e6e39515a5e08bb2f10dc71be5841827b294 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit cf1f57e51578de2ad9121e300f5b816b74938684
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Fri Sep 14 11:27:11 2018 -0400
14087: Refactor federation search for collections by PDH
The shorter, non-PDH search case now comes first. Also adjusts the
logic that decides when to skip federation and makes it wrap the
federation endpoint routing.
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/lib/controller/federation.go b/lib/controller/federation.go
index b5ce7c1a4..a18148fc5 100644
--- a/lib/controller/federation.go
+++ b/lib/controller/federation.go
@@ -188,10 +188,7 @@ func (s *searchRemoteClusterForPDH) filterRemoteClusterResponse(resp *http.Respo
// another request will find it.
// TODO collect and return error responses.
*s.errors = append(*s.errors, fmt.Sprintf("Response from %q: %v", s.remoteID, resp.Status))
- if resp.StatusCode == 404 && (*s.statusCode == 0 || *s.statusCode == 404) {
- // Only return 404 if every response is 404
- *s.statusCode = http.StatusNotFound
- } else {
+ if resp.StatusCode != 404 {
// Got a non-404 error response, convert into BadGateway
*s.statusCode = http.StatusBadGateway
}
@@ -202,7 +199,7 @@ func (s *searchRemoteClusterForPDH) filterRemoteClusterResponse(resp *http.Respo
// This reads the response body. We don't want to hold the
// lock while doing this because other remote requests could
- // also have made it to this point, and we want don't want a
+ // also have made it to this point, and we don't want a
// slow response holding the lock to block a faster response
// that is waiting on the lock.
newResponse, err = rewriteSignaturesClusterId(s.remoteID).rewriteSignatures(resp, nil)
@@ -231,106 +228,91 @@ func (s *searchRemoteClusterForPDH) filterRemoteClusterResponse(resp *http.Respo
func (h *collectionFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
m := collectionByPDHRe.FindStringSubmatch(req.URL.Path)
- if len(m) == 2 {
- bearer := req.Header.Get("Authorization")
- if len(h.handler.Cluster.RemoteClusters) == 0 ||
- (strings.HasPrefix(bearer, "Bearer v2/") &&
- len(bearer) > 10 &&
- bearer[10:15] != h.handler.Cluster.ClusterID) {
- // Either there are no remote clusters to
- // bother searching, or we got a salted token
- // from another cluster (and it's not our job
- // to perform searches for users from other
- // clusters), so just continue down the
- // handler stack.
- h.next.ServeHTTP(w, req)
+ if len(m) != 2 {
+ // Not a collection PDH request
+ m = collectionRe.FindStringSubmatch(req.URL.Path)
+ if len(m) == 2 && m[1] != h.handler.Cluster.ClusterID {
+ // request for remote collection by uuid
+ h.handler.remoteClusterRequest(m[1], w, req,
+ rewriteSignaturesClusterId(m[1]).rewriteSignatures)
return
}
+ // not a collection UUID request, or it is a request
+ // for a local UUID, either way, continue down the
+ // handler stack.
+ h.next.ServeHTTP(w, req)
+ return
+ }
- // First, query the local cluster.
- urlOut, insecure, err := findRailsAPI(h.handler.Cluster, h.handler.NodeProfile)
- if err != nil {
- httpserver.Error(w, err.Error(), http.StatusInternalServerError)
- return
- }
+ // Request for collection by PDH. Search the federation.
- urlOut = &url.URL{
- Scheme: urlOut.Scheme,
- Host: urlOut.Host,
- Path: req.URL.Path,
- RawPath: req.URL.RawPath,
- RawQuery: req.URL.RawQuery,
- }
- client := h.handler.secureClient
- if insecure {
- client = h.handler.insecureClient
- }
- sf := &searchLocalClusterForPDH{}
- h.handler.proxy.Do(w, req, urlOut, client, sf.filterLocalClusterResponse)
- if sf.sentResponse {
- return
- }
+ // First, query the local cluster.
+ urlOut, insecure, err := findRailsAPI(h.handler.Cluster, h.handler.NodeProfile)
+ if err != nil {
+ httpserver.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
- sharedContext, cancelFunc := context.WithCancel(req.Context())
- defer cancelFunc()
- req = req.WithContext(sharedContext)
-
- // Create a goroutine for each cluster in the
- // RemoteClusters map. The first valid result gets
- // returned to the client. When that happens, all
- // other outstanding requests are cancelled or
- // suppressed.
- sentResponse := false
- mtx := sync.Mutex{}
- wg := sync.WaitGroup{}
- var errors []string
- var errorCode int = 0
-
- // use channel as a semaphore to limit it to 4
- // parallel requests at a time
- sem := make(chan bool, 4)
- defer close(sem)
- for remoteID := range h.handler.Cluster.RemoteClusters {
- // blocks until it can put a value into the
- // channel (which has a max queue capacity)
- sem <- true
- if sentResponse {
- break
- }
- search := &searchRemoteClusterForPDH{remoteID, &mtx, &sentResponse,
- &sharedContext, cancelFunc, &errors, &errorCode}
- wg.Add(1)
- go func() {
- h.handler.remoteClusterRequest(search.remoteID, w, req, search.filterRemoteClusterResponse)
- wg.Done()
- <-sem
- }()
- }
- wg.Wait()
+ urlOut = &url.URL{
+ Scheme: urlOut.Scheme,
+ Host: urlOut.Host,
+ Path: req.URL.Path,
+ RawPath: req.URL.RawPath,
+ RawQuery: req.URL.RawQuery,
+ }
+ client := h.handler.secureClient
+ if insecure {
+ client = h.handler.insecureClient
+ }
+ sf := &searchLocalClusterForPDH{}
+ h.handler.proxy.Do(w, req, urlOut, client, sf.filterLocalClusterResponse)
+ if sf.sentResponse {
+ return
+ }
+ sharedContext, cancelFunc := context.WithCancel(req.Context())
+ defer cancelFunc()
+ req = req.WithContext(sharedContext)
+
+ // Create a goroutine for each cluster in the
+ // RemoteClusters map. The first valid result gets
+ // returned to the client. When that happens, all
+ // other outstanding requests are cancelled or
+ // suppressed.
+ sentResponse := false
+ mtx := sync.Mutex{}
+ wg := sync.WaitGroup{}
+ var errors []string
+ var errorCode int = 404
+
+ // use channel as a semaphore to limit it to 4
+ // parallel requests at a time
+ sem := make(chan bool, 4)
+ defer close(sem)
+ for remoteID := range h.handler.Cluster.RemoteClusters {
+ // blocks until it can put a value into the
+ // channel (which has a max queue capacity)
+ sem <- true
if sentResponse {
- return
- }
-
- if errorCode == 0 {
- // If all connections failed/timed out on all
- // remote clusters, no status code could have
- // been returned, so make sure to set one.
- errorCode = http.StatusBadGateway
+ break
}
-
- // No successful responses, so return the error
- httpserver.Errors(w, errors, errorCode)
- return
+ search := &searchRemoteClusterForPDH{remoteID, &mtx, &sentResponse,
+ &sharedContext, cancelFunc, &errors, &errorCode}
+ wg.Add(1)
+ go func() {
+ h.handler.remoteClusterRequest(search.remoteID, w, req, search.filterRemoteClusterResponse)
+ wg.Done()
+ <-sem
+ }()
}
+ wg.Wait()
- m = collectionRe.FindStringSubmatch(req.URL.Path)
- if len(m) < 2 || m[1] == h.handler.Cluster.ClusterID {
- h.next.ServeHTTP(w, req)
+ if sentResponse {
return
}
- h.handler.remoteClusterRequest(m[1], w, req,
- rewriteSignaturesClusterId(m[1]).rewriteSignatures)
+
+ // No successful responses, so return the error
+ httpserver.Errors(w, errors, errorCode)
}
func (h *Handler) setupProxyRemoteCluster(next http.Handler) http.Handler {
@@ -341,6 +323,24 @@ func (h *Handler) setupProxyRemoteCluster(next http.Handler) http.Handler {
mux.Handle("/arvados/v1/collections/", &collectionFederatedRequestHandler{next, h})
mux.Handle("/", next)
+ return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
+ parts := strings.Split(req.Header.Get("Authorization"), "/")
+ alreadySalted := (len(parts) == 3 && parts[0] == "Bearer v2" && len(parts[2]) == 40)
+
+ if alreadySalted ||
+ strings.Index(req.Header.Get("Via"), "arvados-controller") != -1 {
+ // The token is already salted, or this is a
+ // request from another instance of
+ // arvados-controller. In either case, we
+ // don't want to proxy this query, so just
+ // continue down the instance handler stack.
+ next.ServeHTTP(w, req)
+ return
+ }
+
+ mux.ServeHTTP(w, req)
+ })
+
return mux
}
diff --git a/lib/controller/proxy.go b/lib/controller/proxy.go
index 72267b8d9..0f5d43076 100644
--- a/lib/controller/proxy.go
+++ b/lib/controller/proxy.go
@@ -29,6 +29,7 @@ var dropHeaders = map[string]bool{
"TE": true,
"Trailer": true,
"Transfer-Encoding": true,
+ "Content-Encoding": true, // interfers with Go's automatic compression/decompression
"Upgrade": true,
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list