[ARVADOS] created: 1.2.0-290-g5f4b5b4f2
Git user
git at public.curoverse.com
Tue Oct 30 14:02:15 EDT 2018
at 5f4b5b4f28c4e42f6f658da943fa7c378929a016 (commit)
commit 5f4b5b4f28c4e42f6f658da943fa7c378929a016
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Mon Oct 29 17:09:08 2018 -0400
14262: Rewrite collectionFederatedRequestHandler PDH search to use channels
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/lib/controller/fed_collections.go b/lib/controller/fed_collections.go
index 8a97c25c9..88b0f95a0 100644
--- a/lib/controller/fed_collections.go
+++ b/lib/controller/fed_collections.go
@@ -159,63 +159,6 @@ type searchRemoteClusterForPDH struct {
statusCode *int
}
-func (s *searchRemoteClusterForPDH) filterRemoteClusterResponse(resp *http.Response, requestError error) (newResponse *http.Response, err error) {
- s.mtx.Lock()
- defer s.mtx.Unlock()
-
- if *s.sentResponse {
- // Another request already returned a response
- return nil, nil
- }
-
- if requestError != nil {
- *s.errors = append(*s.errors, fmt.Sprintf("Request error contacting %q: %v", s.remoteID, requestError))
- // Record the error and suppress response
- return nil, nil
- }
-
- if resp.StatusCode != http.StatusOK {
- // Suppress returning unsuccessful result. Maybe
- // another request will find it.
- *s.errors = append(*s.errors, fmt.Sprintf("Response to %q from %q: %v", resp.Header.Get(httpserver.HeaderRequestID), s.remoteID, resp.Status))
- if resp.StatusCode != http.StatusNotFound {
- // Got a non-404 error response, convert into BadGateway
- *s.statusCode = http.StatusBadGateway
- }
- return nil, nil
- }
-
- s.mtx.Unlock()
-
- // This reads the response body. We don't want to hold the
- // lock while doing this because other remote requests could
- // also have made it to this point, and we don't want a
- // slow response holding the lock to block a faster response
- // that is waiting on the lock.
- newResponse, err = rewriteSignatures(s.remoteID, s.pdh, resp, nil)
-
- s.mtx.Lock()
-
- if *s.sentResponse {
- // Another request already returned a response
- return nil, nil
- }
-
- if err != nil {
- // Suppress returning unsuccessful result. Maybe
- // another request will be successful.
- *s.errors = append(*s.errors, fmt.Sprintf("Error parsing response from %q: %v", s.remoteID, err))
- return nil, nil
- }
-
- // We have a successful response. Suppress/cancel all the
- // other requests/responses.
- *s.sentResponse = true
- s.cancelFunc()
-
- return newResponse, nil
-}
-
func (h *collectionFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if req.Method != "GET" {
// Only handle GET requests right now
@@ -263,58 +206,107 @@ func (h *collectionFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req
return
}
- sharedContext, cancelFunc := context.WithCancel(req.Context())
- defer cancelFunc()
- req = req.WithContext(sharedContext)
-
// Create a goroutine for each cluster in the
// RemoteClusters map. The first valid result gets
// returned to the client. When that happens, all
- // other outstanding requests are cancelled or
- // suppressed.
- sentResponse := false
- mtx := sync.Mutex{}
+ // other outstanding requests are cancelled
+ sharedContext, cancelFunc := context.WithCancel(req.Context())
+ req = req.WithContext(sharedContext)
wg := sync.WaitGroup{}
- var errors []string
- var errorCode int = http.StatusNotFound
+ pdh := m[1]
+ success := make(chan *http.Response)
+ errorChan := make(chan error)
// use channel as a semaphore to limit the number of concurrent
// requests at a time
sem := make(chan bool, h.handler.Cluster.RequestLimits.GetMultiClusterRequestConcurrency())
+
+ defer close(errorChan)
+ defer close(success)
defer close(sem)
+ defer cancelFunc()
+
for remoteID := range h.handler.Cluster.RemoteClusters {
if remoteID == h.handler.Cluster.ClusterID {
// No need to query local cluster again
continue
}
- // blocks until it can put a value into the
- // channel (which has a max queue capacity)
- sem <- true
- if sentResponse {
- break
- }
- search := &searchRemoteClusterForPDH{m[1], remoteID, &mtx, &sentResponse,
- &sharedContext, cancelFunc, &errors, &errorCode}
+
wg.Add(1)
- go func() {
- resp, cancel, err := h.handler.remoteClusterRequest(search.remoteID, req)
- if cancel != nil {
- defer cancel()
+ go func(remote string) {
+ defer wg.Done()
+ // blocks until it can put a value into the
+ // channel (which has a max queue capacity)
+ sem <- true
+ select {
+ case <-sharedContext.Done():
+ return
+ default:
+ }
+
+ resp, _, err := h.handler.remoteClusterRequest(remote, req)
+ wasSuccess := false
+ defer func() {
+ if resp != nil && !wasSuccess {
+ resp.Body.Close()
+ }
+ }()
+ // Don't need to do anything with the cancel
+ // function returned by remoteClusterRequest
+ // because the context inherits from
+ // sharedContext, so when sharedContext is
+ // cancelled it should cancel that one as
+ // well.
+ if err != nil {
+ errorChan <- err
+ return
}
- newResp, err := search.filterRemoteClusterResponse(resp, err)
- if newResp != nil || err != nil {
- h.handler.proxy.ForwardResponse(w, newResp, err)
+ if resp.StatusCode != http.StatusOK {
+ errorChan <- HTTPError{resp.Status, resp.StatusCode}
+ return
+ }
+ select {
+ case <-sharedContext.Done():
+ return
+ default:
+ }
+
+ newResponse, err := rewriteSignatures(remote, pdh, resp, nil)
+ if err != nil {
+ errorChan <- err
+ return
+ }
+ select {
+ case <-sharedContext.Done():
+ case success <- newResponse:
+ wasSuccess = true
}
- wg.Done()
<-sem
- }()
+ }(remoteID)
}
- wg.Wait()
+ go func() {
+ wg.Wait()
+ cancelFunc()
+ }()
- if sentResponse {
- return
- }
+ var errors []string
+ errorCode := http.StatusNotFound
- // No successful responses, so return the error
- httpserver.Errors(w, errors, errorCode)
+ for {
+ select {
+ case newResp = <-success:
+ h.handler.proxy.ForwardResponse(w, newResp, nil)
+ return
+ case err := <-errorChan:
+ if httperr, ok := err.(HTTPError); ok {
+ if httperr.Code != http.StatusNotFound {
+ errorCode = http.StatusBadGateway
+ }
+ }
+ errors = append(errors, err.Error())
+ case <-sharedContext.Done():
+ httpserver.Errors(w, errors, errorCode)
+ return
+ }
+ }
}
commit f8082fc2466439f4073c2123e689169f06b30b4a
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Mon Oct 29 15:36:45 2018 -0400
14262: Make sure cancel() from proxy.Do() gets called
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/lib/controller/fed_collections.go b/lib/controller/fed_collections.go
index 70dbdc3f5..8a97c25c9 100644
--- a/lib/controller/fed_collections.go
+++ b/lib/controller/fed_collections.go
@@ -34,7 +34,7 @@ func rewriteSignatures(clusterID string, expectHash string,
return resp, requestError
}
- if resp.StatusCode != 200 {
+ if resp.StatusCode != http.StatusOK {
return resp, nil
}
@@ -140,7 +140,7 @@ func filterLocalClusterResponse(resp *http.Response, requestError error) (newRes
return resp, requestError
}
- if resp.StatusCode == 404 {
+ if resp.StatusCode == http.StatusNotFound {
// Suppress returning this result, because we want to
// search the federation.
return nil, nil
@@ -174,12 +174,11 @@ func (s *searchRemoteClusterForPDH) filterRemoteClusterResponse(resp *http.Respo
return nil, nil
}
- if resp.StatusCode != 200 {
+ if resp.StatusCode != http.StatusOK {
// Suppress returning unsuccessful result. Maybe
// another request will find it.
- // TODO collect and return error responses.
- *s.errors = append(*s.errors, fmt.Sprintf("Response to %q from %q: %v", httpserver.GetRequestID(resp.Header), s.remoteID, resp.Status))
- if resp.StatusCode != 404 {
+ *s.errors = append(*s.errors, fmt.Sprintf("Response to %q from %q: %v", resp.Header.Get(httpserver.HeaderRequestID), s.remoteID, resp.Status))
+ if resp.StatusCode != http.StatusNotFound {
// Got a non-404 error response, convert into BadGateway
*s.statusCode = http.StatusBadGateway
}
@@ -236,7 +235,10 @@ func (h *collectionFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req
if clusterId != "" && clusterId != h.handler.Cluster.ClusterID {
// request for remote collection by uuid
- resp, err := h.handler.remoteClusterRequest(clusterId, req)
+ resp, cancel, err := h.handler.remoteClusterRequest(clusterId, req)
+ if cancel != nil {
+ defer cancel()
+ }
newResponse, err := rewriteSignatures(clusterId, "", resp, err)
h.handler.proxy.ForwardResponse(w, newResponse, err)
return
@@ -251,7 +253,10 @@ func (h *collectionFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req
// Request for collection by PDH. Search the federation.
// First, query the local cluster.
- resp, err := h.handler.localClusterRequest(req)
+ resp, localClusterRequestCancel, err := h.handler.localClusterRequest(req)
+ if localClusterRequestCancel != nil {
+ defer localClusterRequestCancel()
+ }
newResp, err := filterLocalClusterResponse(resp, err)
if newResp != nil || err != nil {
h.handler.proxy.ForwardResponse(w, newResp, err)
@@ -271,7 +276,7 @@ func (h *collectionFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req
mtx := sync.Mutex{}
wg := sync.WaitGroup{}
var errors []string
- var errorCode int = 404
+ var errorCode int = http.StatusNotFound
// use channel as a semaphore to limit the number of concurrent
// requests at a time
@@ -292,7 +297,10 @@ func (h *collectionFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req
&sharedContext, cancelFunc, &errors, &errorCode}
wg.Add(1)
go func() {
- resp, err := h.handler.remoteClusterRequest(search.remoteID, req)
+ resp, cancel, err := h.handler.remoteClusterRequest(search.remoteID, req)
+ if cancel != nil {
+ defer cancel()
+ }
newResp, err := search.filterRemoteClusterResponse(resp, err)
if newResp != nil || err != nil {
h.handler.proxy.ForwardResponse(w, newResp, err)
diff --git a/lib/controller/fed_containers.go b/lib/controller/fed_containers.go
index ccb2401bb..a3c292583 100644
--- a/lib/controller/fed_containers.go
+++ b/lib/controller/fed_containers.go
@@ -95,7 +95,10 @@ func remoteContainerRequestCreate(
req.ContentLength = int64(buf.Len())
req.Header.Set("Content-Length", fmt.Sprintf("%v", buf.Len()))
- resp, err := h.handler.remoteClusterRequest(*clusterId, req)
+ resp, cancel, err := h.handler.remoteClusterRequest(*clusterId, req)
+ if cancel != nil {
+ defer cancel()
+ }
h.handler.proxy.ForwardResponse(w, resp, err)
return true
}
diff --git a/lib/controller/fed_generic.go b/lib/controller/fed_generic.go
index 63e61e690..7d5b63d31 100644
--- a/lib/controller/fed_generic.go
+++ b/lib/controller/fed_generic.go
@@ -6,6 +6,7 @@ package controller
import (
"bytes"
+ "context"
"encoding/json"
"fmt"
"io/ioutil"
@@ -65,12 +66,16 @@ func (h *genericFederatedRequestHandler) remoteQueryUUIDs(w http.ResponseWriter,
rc := multiClusterQueryResponseCollector{clusterID: clusterID}
var resp *http.Response
+ var cancel context.CancelFunc
if clusterID == h.handler.Cluster.ClusterID {
- resp, err = h.handler.localClusterRequest(&remoteReq)
+ resp, cancel, err = h.handler.localClusterRequest(&remoteReq)
} else {
- resp, err = h.handler.remoteClusterRequest(clusterID, &remoteReq)
+ resp, cancel, err = h.handler.remoteClusterRequest(clusterID, &remoteReq)
}
rc.collectResponse(resp, err)
+ if cancel != nil {
+ cancel()
+ }
if rc.error != nil {
return nil, "", rc.error
@@ -304,7 +309,10 @@ func (h *genericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *h
if clusterId == "" || clusterId == h.handler.Cluster.ClusterID {
h.next.ServeHTTP(w, req)
} else {
- resp, err := h.handler.remoteClusterRequest(clusterId, req)
+ resp, cancel, err := h.handler.remoteClusterRequest(clusterId, req)
+ if cancel != nil {
+ defer cancel()
+ }
h.handler.proxy.ForwardResponse(w, resp, err)
}
}
diff --git a/lib/controller/federation.go b/lib/controller/federation.go
index dc0aa908c..0e016f301 100644
--- a/lib/controller/federation.go
+++ b/lib/controller/federation.go
@@ -6,6 +6,7 @@ package controller
import (
"bytes"
+ "context"
"database/sql"
"encoding/json"
"fmt"
@@ -28,10 +29,10 @@ var containerRequestsRe = regexp.MustCompile(fmt.Sprintf(pathPattern, "container
var collectionRe = regexp.MustCompile(fmt.Sprintf(pathPattern, "collections", "4zz18"))
var collectionByPDHRe = regexp.MustCompile(`^/arvados/v1/collections/([0-9a-fA-F]{32}\+[0-9]+)+$`)
-func (h *Handler) remoteClusterRequest(remoteID string, req *http.Request) (*http.Response, error) {
+func (h *Handler) remoteClusterRequest(remoteID string, req *http.Request) (*http.Response, context.CancelFunc, error) {
remote, ok := h.Cluster.RemoteClusters[remoteID]
if !ok {
- return nil, HTTPError{fmt.Sprintf("no proxy available for cluster %v", remoteID), http.StatusNotFound}
+ return nil, nil, HTTPError{fmt.Sprintf("no proxy available for cluster %v", remoteID), http.StatusNotFound}
}
scheme := remote.Scheme
if scheme == "" {
@@ -39,7 +40,7 @@ func (h *Handler) remoteClusterRequest(remoteID string, req *http.Request) (*htt
}
saltedReq, err := h.saltAuthToken(req, remoteID)
if err != nil {
- return nil, err
+ return nil, nil, err
}
urlOut := &url.URL{
Scheme: scheme,
@@ -52,7 +53,7 @@ func (h *Handler) remoteClusterRequest(remoteID string, req *http.Request) (*htt
if remote.Insecure {
client = h.insecureClient
}
- return h.proxy.ForwardRequest(saltedReq, urlOut, client)
+ return h.proxy.Do(saltedReq, urlOut, client)
}
// Buffer request body, parse form parameters in request, and then
diff --git a/lib/controller/federation_test.go b/lib/controller/federation_test.go
index 7842ad05d..f6bfca302 100644
--- a/lib/controller/federation_test.go
+++ b/lib/controller/federation_test.go
@@ -94,8 +94,8 @@ func (s *FederationSuite) SetUpTest(c *check.C) {
func (s *FederationSuite) remoteMockHandler(w http.ResponseWriter, req *http.Request) {
b := &bytes.Buffer{}
io.Copy(b, req.Body)
- req.Body = ioutil.NopCloser(b)
req.Body.Close()
+ req.Body = ioutil.NopCloser(b)
s.remoteMockRequests = append(s.remoteMockRequests, *req)
}
diff --git a/lib/controller/handler.go b/lib/controller/handler.go
index 5e9012949..cbfaaddab 100644
--- a/lib/controller/handler.go
+++ b/lib/controller/handler.go
@@ -5,6 +5,7 @@
package controller
import (
+ "context"
"database/sql"
"errors"
"net"
@@ -121,10 +122,10 @@ func prepend(next http.Handler, middleware middlewareFunc) http.Handler {
})
}
-func (h *Handler) localClusterRequest(req *http.Request) (*http.Response, error) {
+func (h *Handler) localClusterRequest(req *http.Request) (*http.Response, context.CancelFunc, error) {
urlOut, insecure, err := findRailsAPI(h.Cluster, h.NodeProfile)
if err != nil {
- return nil, err
+ return nil, nil, err
}
urlOut = &url.URL{
Scheme: urlOut.Scheme,
@@ -137,11 +138,14 @@ func (h *Handler) localClusterRequest(req *http.Request) (*http.Response, error)
if insecure {
client = h.insecureClient
}
- return h.proxy.ForwardRequest(req, urlOut, client)
+ return h.proxy.Do(req, urlOut, client)
}
func (h *Handler) proxyRailsAPI(w http.ResponseWriter, req *http.Request, next http.Handler) {
- resp, err := h.localClusterRequest(req)
+ resp, cancel, err := h.localClusterRequest(req)
+ if cancel != nil {
+ defer cancel()
+ }
n, err := h.proxy.ForwardResponse(w, resp, err)
if err != nil {
httpserver.Logger(req).WithError(err).WithField("bytesCopied", n).Error("error copying response body")
diff --git a/lib/controller/proxy.go b/lib/controller/proxy.go
index b7f3c4f72..c89b9b36a 100644
--- a/lib/controller/proxy.go
+++ b/lib/controller/proxy.go
@@ -45,11 +45,11 @@ var dropHeaders = map[string]bool{
type ResponseFilter func(*http.Response, error) (*http.Response, error)
-// Forward a request to downstream service, and return response or error.
-func (p *proxy) ForwardRequest(
+// Forward a request to upstream service, and return response or error.
+func (p *proxy) Do(
reqIn *http.Request,
urlOut *url.URL,
- client *http.Client) (*http.Response, error) {
+ client *http.Client) (*http.Response, context.CancelFunc, error) {
// Copy headers from incoming request, then add/replace proxy
// headers like Via and X-Forwarded-For.
@@ -70,8 +70,9 @@ func (p *proxy) ForwardRequest(
hdrOut.Add("Via", reqIn.Proto+" arvados-controller")
ctx := reqIn.Context()
+ var cancel context.CancelFunc
if p.RequestTimeout > 0 {
- ctx, _ = context.WithDeadline(ctx, time.Now().Add(time.Duration(p.RequestTimeout)))
+ ctx, cancel = context.WithDeadline(ctx, time.Now().Add(time.Duration(p.RequestTimeout)))
}
reqOut := (&http.Request{
@@ -82,10 +83,11 @@ func (p *proxy) ForwardRequest(
Body: reqIn.Body,
}).WithContext(ctx)
- return client.Do(reqOut)
+ resp, err := client.Do(reqOut)
+ return resp, cancel, err
}
-// Copy a response (or error) to the upstream client
+// Copy a response (or error) to the downstream client
func (p *proxy) ForwardResponse(w http.ResponseWriter, resp *http.Response, err error) (int64, error) {
if err != nil {
if he, ok := err.(HTTPError); ok {
diff --git a/sdk/go/httpserver/id_generator.go b/sdk/go/httpserver/id_generator.go
index 6093a8a7b..14d89873b 100644
--- a/sdk/go/httpserver/id_generator.go
+++ b/sdk/go/httpserver/id_generator.go
@@ -12,6 +12,10 @@ import (
"time"
)
+const (
+ HeaderRequestID = "X-Request-Id"
+)
+
// IDGenerator generates alphanumeric strings suitable for use as
// unique IDs (a given IDGenerator will never return the same ID
// twice).
@@ -44,16 +48,12 @@ func (g *IDGenerator) Next() string {
func AddRequestIDs(h http.Handler) http.Handler {
gen := &IDGenerator{Prefix: "req-"}
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
- if req.Header.Get("X-Request-Id") == "" {
+ if req.Header.Get(HeaderRequestID) == "" {
if req.Header == nil {
req.Header = http.Header{}
}
- req.Header.Set("X-Request-Id", gen.Next())
+ req.Header.Set(HeaderRequestID, gen.Next())
}
h.ServeHTTP(w, req)
})
}
-
-func GetRequestID(h http.Header) string {
- return h.Get("X-Request-Id")
-}
commit 9f1d3097cada582af1d516c7505594080cd10b39
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Mon Oct 29 14:56:26 2018 -0400
14262: Only allow unknown PDH for images when there are remote_hosts
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/services/api/app/models/collection.rb b/services/api/app/models/collection.rb
index 718ffc0d0..487043ee3 100644
--- a/services/api/app/models/collection.rb
+++ b/services/api/app/models/collection.rb
@@ -496,7 +496,14 @@ class Collection < ArvadosModel
if loc = Keep::Locator.parse(search_term)
loc.strip_hints!
coll_match = readable_by(*readers).where(portable_data_hash: loc.to_s).limit(1)
- return get_compatible_images(readers, pattern, coll_match)
+ if coll_match.any? or Rails.configuration.remote_hosts.length == 0
+ return get_compatible_images(readers, pattern, coll_match)
+ else
+ # Allow bare pdh that doesn't exist in the local database so
+ # that federated container requests which refer to remotely
+ # stored containers will validate.
+ return [Collection.new(portable_data_hash: loc.to_s)]
+ end
end
if search_tag.nil? and (n = search_term.index(":"))
diff --git a/services/api/app/models/container.rb b/services/api/app/models/container.rb
index e469a49be..0d8453174 100644
--- a/services/api/app/models/container.rb
+++ b/services/api/app/models/container.rb
@@ -248,15 +248,6 @@ class Container < ArvadosModel
def self.resolve_container_image(container_image)
coll = Collection.for_latest_docker_image(container_image)
if !coll
- if loc = Keep::Locator.parse(container_image)
- loc.strip_hints!
- if !Collection.readable_by(current_user).where(portable_data_hash: loc.to_s).any?
- # Allow bare pdh that doesn't exist in the local database so
- # that federated container requests which refer to remotely
- # stored containers will validate.
- return loc.to_s
- end
- end
raise ArvadosModel::UnresolvableContainerError.new "docker image #{container_image.inspect} not found"
end
coll.portable_data_hash
diff --git a/services/api/test/unit/container_request_test.rb b/services/api/test/unit/container_request_test.rb
index e4c3399c4..0fafb9903 100644
--- a/services/api/test/unit/container_request_test.rb
+++ b/services/api/test/unit/container_request_test.rb
@@ -500,7 +500,8 @@ class ContainerRequestTest < ActiveSupport::TestCase
end
end
- ['ENOEXIST',
+ ['acbd18db4cc2f85cedef654fccc4a4d8+3',
+ 'ENOEXIST',
'arvados/apitestfixture:ENOEXIST',
].each do |img|
test "container_image_for_container(#{img.inspect}) => 422" do
@@ -511,6 +512,12 @@ class ContainerRequestTest < ActiveSupport::TestCase
end
end
+ test "allow unrecognized container when there are remote_hosts" do
+ set_user_from_auth :active
+ Rails.configuration.remote_hosts = {"foooo" => "bar.com"}
+ Container.resolve_container_image('acbd18db4cc2f85cedef654fccc4a4d8+3')
+ end
+
test "migrated docker image" do
Rails.configuration.docker_image_formats = ['v2']
add_docker19_migration_link
commit 85558cf6051a9e79f8b32a6c414c4bcd667acbfb
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Wed Oct 24 15:30:34 2018 -0400
14262: Fix logic for when to allow unknown PDH for containers
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/services/api/app/models/container.rb b/services/api/app/models/container.rb
index 5d46ac29f..e469a49be 100644
--- a/services/api/app/models/container.rb
+++ b/services/api/app/models/container.rb
@@ -250,7 +250,7 @@ class Container < ArvadosModel
if !coll
if loc = Keep::Locator.parse(container_image)
loc.strip_hints!
- if !Collection.where(portable_data_hash: loc.to_s).any?
+ if !Collection.readable_by(current_user).where(portable_data_hash: loc.to_s).any?
# Allow bare pdh that doesn't exist in the local database so
# that federated container requests which refer to remotely
# stored containers will validate.
commit 5bb2cfd58200beefe0b80ef27e6ac6d40a17d6a2
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Wed Oct 24 14:44:25 2018 -0400
14198: Fix error responses in container POST
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/lib/controller/fed_containers.go b/lib/controller/fed_containers.go
index e8cc739b0..ccb2401bb 100644
--- a/lib/controller/fed_containers.go
+++ b/lib/controller/fed_containers.go
@@ -30,11 +30,18 @@ func remoteContainerRequestCreate(
return false
}
- defer req.Body.Close()
+ if req.Header.Get("Content-Type") != "application/json" {
+ httpserver.Error(w, "Expected Content-Type: application/json, got "+req.Header.Get("Content-Type"), http.StatusBadRequest)
+ return true
+ }
+
+ originalBody := req.Body
+ defer originalBody.Close()
var request map[string]interface{}
err := json.NewDecoder(req.Body).Decode(&request)
if err != nil {
- return false
+ httpserver.Error(w, err.Error(), http.StatusBadRequest)
+ return true
}
crString, ok := request["container_request"].(string)
@@ -42,7 +49,8 @@ func remoteContainerRequestCreate(
var crJson map[string]interface{}
err := json.Unmarshal([]byte(crString), &crJson)
if err != nil {
- return false
+ httpserver.Error(w, err.Error(), http.StatusBadRequest)
+ return true
}
request["container_request"] = crJson
@@ -50,7 +58,8 @@ func remoteContainerRequestCreate(
containerRequest, ok := request["container_request"].(map[string]interface{})
if !ok {
- return false
+ // Use toplevel object as the container_request object
+ containerRequest = request
}
// If runtime_token is not set, create a new token
@@ -68,7 +77,8 @@ func remoteContainerRequestCreate(
}
if len(currentUser.Authorization.Scopes) != 1 || currentUser.Authorization.Scopes[0] != "all" {
- return false
+ httpserver.Error(w, "Token scope is not [all]", http.StatusForbidden)
+ return true
}
newtok, err := h.handler.createAPItoken(req, currentUser.UUID, nil)
commit bd8433be65966bb43fe23df7950c52c1285dc6bf
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Tue Oct 23 14:30:39 2018 -0400
14262: Handle container_request posted as a string parameter
Needs to be parsed as json second time (this is how the Ruby 'arv'
client submits it, unfortunately.)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/lib/controller/fed_collections.go b/lib/controller/fed_collections.go
index 62f98367c..70dbdc3f5 100644
--- a/lib/controller/fed_collections.go
+++ b/lib/controller/fed_collections.go
@@ -178,7 +178,7 @@ func (s *searchRemoteClusterForPDH) filterRemoteClusterResponse(resp *http.Respo
// Suppress returning unsuccessful result. Maybe
// another request will find it.
// TODO collect and return error responses.
- *s.errors = append(*s.errors, fmt.Sprintf("Response from %q: %v", s.remoteID, resp.Status))
+ *s.errors = append(*s.errors, fmt.Sprintf("Response to %q from %q: %v", httpserver.GetRequestID(resp.Header), s.remoteID, resp.Status))
if resp.StatusCode != 404 {
// Got a non-404 error response, convert into BadGateway
*s.statusCode = http.StatusBadGateway
diff --git a/lib/controller/fed_containers.go b/lib/controller/fed_containers.go
index 32ae25fc4..e8cc739b0 100644
--- a/lib/controller/fed_containers.go
+++ b/lib/controller/fed_containers.go
@@ -33,10 +33,23 @@ func remoteContainerRequestCreate(
defer req.Body.Close()
var request map[string]interface{}
err := json.NewDecoder(req.Body).Decode(&request)
+ if err != nil {
+ return false
+ }
+
+ crString, ok := request["container_request"].(string)
+ if ok {
+ var crJson map[string]interface{}
+ err := json.Unmarshal([]byte(crString), &crJson)
+ if err != nil {
+ return false
+ }
+
+ request["container_request"] = crJson
+ }
containerRequest, ok := request["container_request"].(map[string]interface{})
if !ok {
- log.Printf("wah wah")
return false
}
diff --git a/sdk/go/httpserver/id_generator.go b/sdk/go/httpserver/id_generator.go
index 6452136d8..6093a8a7b 100644
--- a/sdk/go/httpserver/id_generator.go
+++ b/sdk/go/httpserver/id_generator.go
@@ -53,3 +53,7 @@ func AddRequestIDs(h http.Handler) http.Handler {
h.ServeHTTP(w, req)
})
}
+
+func GetRequestID(h http.Header) string {
+ return h.Get("X-Request-Id")
+}
commit 633c03c0dd1bbc5413e4fa8203045fcfb8eaf6db
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Tue Oct 23 13:29:53 2018 -0400
14262: Fix crunch-run tests
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/services/crunch-run/crunchrun_test.go b/services/crunch-run/crunchrun_test.go
index eb4f220e2..2f254b5bd 100644
--- a/services/crunch-run/crunchrun_test.go
+++ b/services/crunch-run/crunchrun_test.go
@@ -443,6 +443,10 @@ func (s *TestSuite) TestLoadImage(c *C) {
cr, err := NewContainerRunner(s.client, &ArvTestClient{}, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
c.Assert(err, IsNil)
+ cr.MkArvClient = func(token string) (IArvadosClient, IKeepClient, error) {
+ return &ArvTestClient{}, kc, nil
+ }
+
_, err = cr.Docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
c.Check(err, IsNil)
@@ -488,6 +492,9 @@ func (ArvErrorTestClient) Create(resourceType string,
}
func (ArvErrorTestClient) Call(method, resourceType, uuid, action string, parameters arvadosclient.Dict, output interface{}) error {
+ if method == "GET" && resourceType == "containers" && action == "auth" {
+ return nil
+ }
return errors.New("ArvError")
}
@@ -548,9 +555,13 @@ func (s *TestSuite) TestLoadImageArvError(c *C) {
// (1) Arvados error
kc := &KeepTestClient{}
defer kc.Close()
- cr, err := NewContainerRunner(s.client, ArvErrorTestClient{}, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ cr, err := NewContainerRunner(s.client, &ArvErrorTestClient{}, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
c.Assert(err, IsNil)
+
cr.Container.ContainerImage = hwPDH
+ cr.MkArvClient = func(token string) (IArvadosClient, IKeepClient, error) {
+ return &ArvErrorTestClient{}, &KeepTestClient{}, nil
+ }
err = cr.LoadImage()
c.Check(err.Error(), Equals, "While getting container image collection: ArvError")
@@ -558,9 +569,13 @@ func (s *TestSuite) TestLoadImageArvError(c *C) {
func (s *TestSuite) TestLoadImageKeepError(c *C) {
// (2) Keep error
- cr, err := NewContainerRunner(s.client, &ArvTestClient{}, &KeepErrorTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ kc := &KeepErrorTestClient{}
+ cr, err := NewContainerRunner(s.client, &ArvTestClient{}, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
c.Assert(err, IsNil)
cr.Container.ContainerImage = hwPDH
+ cr.MkArvClient = func(token string) (IArvadosClient, IKeepClient, error) {
+ return &ArvTestClient{}, kc, nil
+ }
err = cr.LoadImage()
c.Assert(err, NotNil)
@@ -569,9 +584,13 @@ func (s *TestSuite) TestLoadImageKeepError(c *C) {
func (s *TestSuite) TestLoadImageCollectionError(c *C) {
// (3) Collection doesn't contain image
- cr, err := NewContainerRunner(s.client, &ArvTestClient{}, &KeepReadErrorTestClient{}, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ kc := &KeepReadErrorTestClient{}
+ cr, err := NewContainerRunner(s.client, &ArvTestClient{}, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
c.Assert(err, IsNil)
cr.Container.ContainerImage = otherPDH
+ cr.MkArvClient = func(token string) (IArvadosClient, IKeepClient, error) {
+ return &ArvTestClient{}, kc, nil
+ }
err = cr.LoadImage()
c.Check(err.Error(), Equals, "First file in the container image collection does not end in .tar")
@@ -579,9 +598,13 @@ func (s *TestSuite) TestLoadImageCollectionError(c *C) {
func (s *TestSuite) TestLoadImageKeepReadError(c *C) {
// (4) Collection doesn't contain image
- cr, err := NewContainerRunner(s.client, &ArvTestClient{}, &KeepReadErrorTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ kc := &KeepReadErrorTestClient{}
+ cr, err := NewContainerRunner(s.client, &ArvTestClient{}, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
c.Assert(err, IsNil)
cr.Container.ContainerImage = hwPDH
+ cr.MkArvClient = func(token string) (IArvadosClient, IKeepClient, error) {
+ return &ArvTestClient{}, kc, nil
+ }
err = cr.LoadImage()
c.Check(err, NotNil)
@@ -629,6 +652,10 @@ func (s *TestSuite) TestRunContainer(c *C) {
cr, err := NewContainerRunner(s.client, &ArvTestClient{}, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
c.Assert(err, IsNil)
+ cr.MkArvClient = func(token string) (IArvadosClient, IKeepClient, error) {
+ return &ArvTestClient{}, kc, nil
+ }
+
var logs TestLogs
cr.NewLogWriter = logs.NewTestLoggingWriter
cr.Container.ContainerImage = hwPDH
@@ -772,8 +799,8 @@ func (s *TestSuite) fullRunHelper(c *C, record string, extraMounts []string, exi
}
return d, err
}
- cr.MkArvClient = func(token string) (IArvadosClient, error) {
- return &ArvTestClient{secretMounts: secretMounts}, nil
+ cr.MkArvClient = func(token string) (IArvadosClient, IKeepClient, error) {
+ return &ArvTestClient{secretMounts: secretMounts}, &KeepTestClient{}, nil
}
if extraMounts != nil && len(extraMounts) > 0 {
@@ -1069,8 +1096,8 @@ func (s *TestSuite) testStopContainer(c *C, setup func(cr *ContainerRunner)) {
cr, err := NewContainerRunner(s.client, api, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
c.Assert(err, IsNil)
cr.RunArvMount = func([]string, string) (*exec.Cmd, error) { return nil, nil }
- cr.MkArvClient = func(token string) (IArvadosClient, error) {
- return &ArvTestClient{}, nil
+ cr.MkArvClient = func(token string) (IArvadosClient, IKeepClient, error) {
+ return &ArvTestClient{}, &KeepTestClient{}, nil
}
setup(cr)
@@ -1553,8 +1580,8 @@ func (s *TestSuite) stdoutErrorRunHelper(c *C, record string, fn func(t *TestDoc
c.Assert(err, IsNil)
am := &ArvMountCmdLine{}
cr.RunArvMount = am.ArvMountTest
- cr.MkArvClient = func(token string) (IArvadosClient, error) {
- return &ArvTestClient{}, nil
+ cr.MkArvClient = func(token string) (IArvadosClient, IKeepClient, error) {
+ return &ArvTestClient{}, &KeepTestClient{}, nil
}
err = cr.Run()
commit 401676eeb67cbccc8b5bfec1d273d816792ca57b
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Tue Oct 23 09:26:48 2018 -0400
14262: Missing file
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/lib/controller/fed_containers.go b/lib/controller/fed_containers.go
new file mode 100644
index 000000000..32ae25fc4
--- /dev/null
+++ b/lib/controller/fed_containers.go
@@ -0,0 +1,78 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package controller
+
+import (
+ "bytes"
+ "encoding/json"
+ "fmt"
+ "io/ioutil"
+ "log"
+ "net/http"
+
+ "git.curoverse.com/arvados.git/sdk/go/auth"
+ "git.curoverse.com/arvados.git/sdk/go/httpserver"
+)
+
+func remoteContainerRequestCreate(
+ h *genericFederatedRequestHandler,
+ effectiveMethod string,
+ clusterId *string,
+ uuid string,
+ remainder string,
+ w http.ResponseWriter,
+ req *http.Request) bool {
+
+ if effectiveMethod != "POST" || uuid != "" || remainder != "" ||
+ *clusterId == "" || *clusterId == h.handler.Cluster.ClusterID {
+ return false
+ }
+
+ defer req.Body.Close()
+ var request map[string]interface{}
+ err := json.NewDecoder(req.Body).Decode(&request)
+
+ containerRequest, ok := request["container_request"].(map[string]interface{})
+ if !ok {
+ log.Printf("wah wah")
+ return false
+ }
+
+ // If runtime_token is not set, create a new token
+ if _, ok := containerRequest["runtime_token"]; !ok {
+ log.Printf("ok %v", ok)
+
+ // First make sure supplied token is valid.
+ creds := auth.NewCredentials()
+ creds.LoadTokensFromHTTPRequest(req)
+
+ currentUser, err := h.handler.validateAPItoken(req, creds.Tokens[0])
+ if err != nil {
+ httpserver.Error(w, err.Error(), http.StatusForbidden)
+ return true
+ }
+
+ if len(currentUser.Authorization.Scopes) != 1 || currentUser.Authorization.Scopes[0] != "all" {
+ return false
+ }
+
+ newtok, err := h.handler.createAPItoken(req, currentUser.UUID, nil)
+ if err != nil {
+ httpserver.Error(w, err.Error(), http.StatusForbidden)
+ return true
+ }
+ containerRequest["runtime_token"] = newtok.TokenV2()
+ }
+
+ newbody, err := json.Marshal(request)
+ buf := bytes.NewBuffer(newbody)
+ req.Body = ioutil.NopCloser(buf)
+ req.ContentLength = int64(buf.Len())
+ req.Header.Set("Content-Length", fmt.Sprintf("%v", buf.Len()))
+
+ resp, err := h.handler.remoteClusterRequest(*clusterId, req)
+ h.handler.proxy.ForwardResponse(w, resp, err)
+ return true
+}
commit 6bc01c885f5fb3a234b2ee3acaf6e8d59cf60b4f
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Mon Oct 22 17:01:52 2018 -0400
14262: Fix tests
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/lib/controller/proxy.go b/lib/controller/proxy.go
index 9aecdc1b2..b7f3c4f72 100644
--- a/lib/controller/proxy.go
+++ b/lib/controller/proxy.go
@@ -71,9 +71,7 @@ func (p *proxy) ForwardRequest(
ctx := reqIn.Context()
if p.RequestTimeout > 0 {
- var cancel context.CancelFunc
- ctx, cancel = context.WithDeadline(ctx, time.Now().Add(time.Duration(p.RequestTimeout)))
- defer cancel()
+ ctx, _ = context.WithDeadline(ctx, time.Now().Add(time.Duration(p.RequestTimeout)))
}
reqOut := (&http.Request{
diff --git a/sdk/go/arvados/client.go b/sdk/go/arvados/client.go
index 923cecdd5..254a0fa7d 100644
--- a/sdk/go/arvados/client.go
+++ b/sdk/go/arvados/client.go
@@ -103,7 +103,7 @@ var reqIDGen = httpserver.IDGenerator{Prefix: "req-"}
// (*http.Client)Do().
func (c *Client) Do(req *http.Request) (*http.Response, error) {
if c.AuthToken != "" {
- req.Header.Add("Authorization", "OAuth2 "+c.AuthToken)
+ req.Header.Set("Authorization", "OAuth2 "+c.AuthToken)
}
if req.Header.Get("X-Request-Id") == "" {
@@ -215,7 +215,7 @@ func (c *Client) MakeRequest(method, path string, body io.Reader, params interfa
req.Header.Set("Content-type", "application/x-www-form-urlencoded")
if c.AuthToken != "" {
- req.Header.Add("Authorization", "OAuth2 "+c.AuthToken)
+ req.Header.Set("Authorization", "OAuth2 "+c.AuthToken)
}
if req.Header.Get("X-Request-Id") == "" {
diff --git a/services/api/app/models/container.rb b/services/api/app/models/container.rb
index 7d8cc00f2..5d46ac29f 100644
--- a/services/api/app/models/container.rb
+++ b/services/api/app/models/container.rb
@@ -248,11 +248,14 @@ class Container < ArvadosModel
def self.resolve_container_image(container_image)
coll = Collection.for_latest_docker_image(container_image)
if !coll
- # Allow bare pdh without any additional checking otherwise
- # federated container requests won't work.
if loc = Keep::Locator.parse(container_image)
loc.strip_hints!
- return loc.to_s
+ if !Collection.where(portable_data_hash: loc.to_s).any?
+ # Allow bare pdh that doesn't exist in the local database so
+ # that federated container requests which refer to remotely
+ # stored containers will validate.
+ return loc.to_s
+ end
end
raise ArvadosModel::UnresolvableContainerError.new "docker image #{container_image.inspect} not found"
end
diff --git a/services/api/test/unit/container_request_test.rb b/services/api/test/unit/container_request_test.rb
index 8ff216e28..e4c3399c4 100644
--- a/services/api/test/unit/container_request_test.rb
+++ b/services/api/test/unit/container_request_test.rb
@@ -500,8 +500,7 @@ class ContainerRequestTest < ActiveSupport::TestCase
end
end
- ['acbd18db4cc2f85cedef654fccc4a4d8+3',
- 'ENOEXIST',
+ ['ENOEXIST',
'arvados/apitestfixture:ENOEXIST',
].each do |img|
test "container_image_for_container(#{img.inspect}) => 422" do
commit 010665613b937b41dc24896322c5ad05ae8707df
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Mon Oct 22 12:13:38 2018 -0400
14262: Tests for setting and checking container tokens.
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/lib/controller/federation.go b/lib/controller/federation.go
index 18f3e4479..dc0aa908c 100644
--- a/lib/controller/federation.go
+++ b/lib/controller/federation.go
@@ -185,9 +185,9 @@ func (h *Handler) createAPItoken(req *http.Request, userUUID string, scopes []st
(uuid, api_token, expires_at, scopes,
user_id,
api_client_id, created_at, updated_at)
-VALUES ($1, $2, now() + INTERVAL '2 weeks', $3,
+VALUES ($1, $2, CURRENT_TIMESTAMP + INTERVAL '2 weeks', $3,
(SELECT id FROM users WHERE users.uuid=$4 LIMIT 1),
-0, now(), now())`,
+0, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)`,
uuid, token, string(scopesjson), userUUID)
if err != nil {
diff --git a/lib/controller/federation_test.go b/lib/controller/federation_test.go
index 23d5d7ca7..7842ad05d 100644
--- a/lib/controller/federation_test.go
+++ b/lib/controller/federation_test.go
@@ -5,8 +5,10 @@
package controller
import (
+ "bytes"
"encoding/json"
"fmt"
+ "io"
"io/ioutil"
"net/http"
"net/http/httptest"
@@ -90,6 +92,10 @@ func (s *FederationSuite) SetUpTest(c *check.C) {
}
func (s *FederationSuite) remoteMockHandler(w http.ResponseWriter, req *http.Request) {
+ b := &bytes.Buffer{}
+ io.Copy(b, req.Body)
+ req.Body = ioutil.NopCloser(b)
+ req.Body.Close()
s.remoteMockRequests = append(s.remoteMockRequests, *req)
}
@@ -567,6 +573,67 @@ func (s *FederationSuite) TestCreateRemoteContainerRequest(c *check.C) {
c.Check(strings.HasPrefix(cr.UUID, "zzzzz-"), check.Equals, true)
}
+func (s *FederationSuite) TestCreateRemoteContainerRequestCheckRuntimeToken(c *check.C) {
+ // Send request to zmock and check that outgoing request has
+ // runtime_token sent (because runtime_token isn't returned in
+ // the response).
+
+ defer s.localServiceReturns404(c).Close()
+ // pass cluster_id via query parameter, this allows arvados-controller
+ // to avoid parsing the body
+ req := httptest.NewRequest("POST", "/arvados/v1/container_requests?cluster_id=zmock",
+ strings.NewReader(`{
+ "container_request": {
+ "name": "hello world",
+ "state": "Uncommitted",
+ "output_path": "/",
+ "container_image": "123",
+ "command": ["abc"]
+ }
+}
+`))
+ req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+ req.Header.Set("Content-type", "application/json")
+ resp := s.testRequest(req)
+ c.Check(resp.StatusCode, check.Equals, http.StatusOK)
+ var cr struct {
+ arvados.ContainerRequest `json:"container_request"`
+ }
+ c.Check(json.NewDecoder(s.remoteMockRequests[0].Body).Decode(&cr), check.IsNil)
+ c.Check(strings.HasPrefix(cr.ContainerRequest.RuntimeToken, "v2/"), check.Equals, true)
+}
+
+func (s *FederationSuite) TestCreateRemoteContainerRequestCheckSetRuntimeToken(c *check.C) {
+ // Send request to zmock and check that outgoing request has
+ // runtime_token sent (because runtime_token isn't returned in
+ // the response).
+
+ defer s.localServiceReturns404(c).Close()
+ // pass cluster_id via query parameter, this allows arvados-controller
+ // to avoid parsing the body
+ req := httptest.NewRequest("POST", "/arvados/v1/container_requests?cluster_id=zmock",
+ strings.NewReader(`{
+ "container_request": {
+ "name": "hello world",
+ "state": "Uncommitted",
+ "output_path": "/",
+ "container_image": "123",
+ "command": ["abc"],
+ "runtime_token": "xyz"
+ }
+}
+`))
+ req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+ req.Header.Set("Content-type", "application/json")
+ resp := s.testRequest(req)
+ c.Check(resp.StatusCode, check.Equals, http.StatusOK)
+ var cr struct {
+ arvados.ContainerRequest `json:"container_request"`
+ }
+ c.Check(json.NewDecoder(s.remoteMockRequests[0].Body).Decode(&cr), check.IsNil)
+ c.Check(cr.ContainerRequest.RuntimeToken, check.Equals, "xyz")
+}
+
func (s *FederationSuite) TestCreateRemoteContainerRequestError(c *check.C) {
defer s.localServiceReturns404(c).Close()
// pass cluster_id via query parameter, this allows arvados-controller
diff --git a/sdk/go/arvados/container.go b/sdk/go/arvados/container.go
index 2622c1370..b70b4ac91 100644
--- a/sdk/go/arvados/container.go
+++ b/sdk/go/arvados/container.go
@@ -56,6 +56,7 @@ type ContainerRequest struct {
UseExisting bool `json:"use_existing"`
LogUUID string `json:"log_uuid"`
OutputUUID string `json:"output_uuid"`
+ RuntimeToken string `json:"runtime_token"`
}
// Mount is special behavior to attach to a filesystem path or device.
commit c2c01627bc1e487616e8d6cb5e5f9ff61717b851
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Mon Oct 22 11:02:02 2018 -0400
14262: Add createAPIToken, with test
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/lib/controller/fed_generic.go b/lib/controller/fed_generic.go
index 0630217b6..63e61e690 100644
--- a/lib/controller/fed_generic.go
+++ b/lib/controller/fed_generic.go
@@ -17,10 +17,20 @@ import (
"git.curoverse.com/arvados.git/sdk/go/httpserver"
)
+type federatedRequestDelegate func(
+ h *genericFederatedRequestHandler,
+ effectiveMethod string,
+ clusterId *string,
+ uuid string,
+ remainder string,
+ w http.ResponseWriter,
+ req *http.Request) bool
+
type genericFederatedRequestHandler struct {
- next http.Handler
- handler *Handler
- matcher *regexp.Regexp
+ next http.Handler
+ handler *Handler
+ matcher *regexp.Regexp
+ delegates []federatedRequestDelegate
}
func (h *genericFederatedRequestHandler) remoteQueryUUIDs(w http.ResponseWriter,
@@ -285,6 +295,12 @@ func (h *genericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *h
return
}
+ for _, d := range h.delegates {
+ if d(h, effectiveMethod, &clusterId, m[1], m[3], w, req) {
+ return
+ }
+ }
+
if clusterId == "" || clusterId == h.handler.Cluster.ClusterID {
h.next.ServeHTTP(w, req)
} else {
diff --git a/lib/controller/federation.go b/lib/controller/federation.go
index 03d2f3fab..18f3e4479 100644
--- a/lib/controller/federation.go
+++ b/lib/controller/federation.go
@@ -7,6 +7,7 @@ package controller
import (
"bytes"
"database/sql"
+ "encoding/json"
"fmt"
"io"
"io/ioutil"
@@ -17,6 +18,7 @@ import (
"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/auth"
+ "github.com/jmcvetta/randutil"
)
var pathPattern = `^/arvados/v1/%s(/([0-9a-z]{5})-%s-[0-9a-z]{15})?(.*)$`
@@ -82,12 +84,18 @@ func loadParamsFromForm(req *http.Request) error {
func (h *Handler) setupProxyRemoteCluster(next http.Handler) http.Handler {
mux := http.NewServeMux()
- mux.Handle("/arvados/v1/workflows", &genericFederatedRequestHandler{next, h, wfRe})
- mux.Handle("/arvados/v1/workflows/", &genericFederatedRequestHandler{next, h, wfRe})
- mux.Handle("/arvados/v1/containers", &genericFederatedRequestHandler{next, h, containersRe})
- mux.Handle("/arvados/v1/containers/", &genericFederatedRequestHandler{next, h, containersRe})
- mux.Handle("/arvados/v1/container_requests", &genericFederatedRequestHandler{next, h, containerRequestsRe})
- mux.Handle("/arvados/v1/container_requests/", &genericFederatedRequestHandler{next, h, containerRequestsRe})
+
+ wfHandler := &genericFederatedRequestHandler{next, h, wfRe, nil}
+ containersHandler := &genericFederatedRequestHandler{next, h, containersRe, nil}
+ containerRequestsHandler := &genericFederatedRequestHandler{next, h, containerRequestsRe,
+ []federatedRequestDelegate{remoteContainerRequestCreate}}
+
+ mux.Handle("/arvados/v1/workflows", wfHandler)
+ mux.Handle("/arvados/v1/workflows/", wfHandler)
+ mux.Handle("/arvados/v1/containers", containersHandler)
+ mux.Handle("/arvados/v1/containers/", containersHandler)
+ mux.Handle("/arvados/v1/container_requests", containerRequestsHandler)
+ mux.Handle("/arvados/v1/container_requests/", containerRequestsHandler)
mux.Handle("/arvados/v1/collections", next)
mux.Handle("/arvados/v1/collections/", &collectionFederatedRequestHandler{next, h})
mux.Handle("/", next)
@@ -118,12 +126,79 @@ type CurrentUser struct {
UUID string
}
-func (h *Handler) validateAPItoken(req *http.Request, user *CurrentUser) error {
+// validateAPItoken extracts the token from the provided http request,
+// checks it again api_client_authorizations table in the database,
+// and fills in the token scope and user UUID. Does not handle remote
+// tokens unless they are already in the database and not expired.
+func (h *Handler) validateAPItoken(req *http.Request, token string) (*CurrentUser, error) {
+ user := CurrentUser{Authorization: arvados.APIClientAuthorization{APIToken: token}}
db, err := h.db(req)
if err != nil {
- return err
+ return nil, err
+ }
+
+ var uuid string
+ if strings.HasPrefix(token, "v2/") {
+ sp := strings.Split(token, "/")
+ uuid = sp[1]
+ token = sp[2]
+ }
+ user.Authorization.APIToken = token
+ var scopes string
+ err = db.QueryRowContext(req.Context(), `SELECT api_client_authorizations.uuid, api_client_authorizations.scopes, users.uuid FROM api_client_authorizations JOIN users on api_client_authorizations.user_id=users.id WHERE api_token=$1 AND (expires_at IS NULL OR expires_at > current_timestamp) LIMIT 1`, token).Scan(&user.Authorization.UUID, &scopes, &user.UUID)
+ if err != nil {
+ return nil, err
+ }
+ if uuid != "" && user.Authorization.UUID != uuid {
+ return nil, fmt.Errorf("UUID embedded in v2 token did not match record")
+ }
+ err = json.Unmarshal([]byte(scopes), &user.Authorization.Scopes)
+ if err != nil {
+ return nil, err
+ }
+ return &user, nil
+}
+
+func (h *Handler) createAPItoken(req *http.Request, userUUID string, scopes []string) (*arvados.APIClientAuthorization, error) {
+ db, err := h.db(req)
+ if err != nil {
+ return nil, err
+ }
+ rd, err := randutil.String(15, "abcdefghijklmnopqrstuvwxyz0123456789")
+ if err != nil {
+ return nil, err
+ }
+ uuid := fmt.Sprintf("%v-gj3su-%v", h.Cluster.ClusterID, rd)
+ token, err := randutil.String(50, "abcdefghijklmnopqrstuvwxyz0123456789")
+ if err != nil {
+ return nil, err
+ }
+ if len(scopes) == 0 {
+ scopes = append(scopes, "all")
}
- return db.QueryRowContext(req.Context(), `SELECT api_client_authorizations.uuid, users.uuid FROM api_client_authorizations JOIN users on api_client_authorizations.user_id=users.id WHERE api_token=$1 AND (expires_at IS NULL OR expires_at > current_timestamp) LIMIT 1`, user.Authorization.APIToken).Scan(&user.Authorization.UUID, &user.UUID)
+ scopesjson, err := json.Marshal(scopes)
+ if err != nil {
+ return nil, err
+ }
+ _, err = db.ExecContext(req.Context(),
+ `INSERT INTO api_client_authorizations
+(uuid, api_token, expires_at, scopes,
+user_id,
+api_client_id, created_at, updated_at)
+VALUES ($1, $2, now() + INTERVAL '2 weeks', $3,
+(SELECT id FROM users WHERE users.uuid=$4 LIMIT 1),
+0, now(), now())`,
+ uuid, token, string(scopesjson), userUUID)
+
+ if err != nil {
+ return nil, err
+ }
+
+ return &arvados.APIClientAuthorization{
+ UUID: uuid,
+ APIToken: token,
+ ExpiresAt: "",
+ Scopes: scopes}, nil
}
// Extract the auth token supplied in req, and replace it with a
@@ -165,11 +240,10 @@ func (h *Handler) saltAuthToken(req *http.Request, remote string) (updatedReq *h
// If the token exists in our own database, salt it
// for the remote. Otherwise, assume it was issued by
// the remote, and pass it through unmodified.
- currentUser := CurrentUser{Authorization: arvados.APIClientAuthorization{APIToken: creds.Tokens[0]}}
- err = h.validateAPItoken(req, ¤tUser)
+ currentUser, err := h.validateAPItoken(req, creds.Tokens[0])
if err == sql.ErrNoRows {
// Not ours; pass through unmodified.
- token = currentUser.Authorization.APIToken
+ token = creds.Tokens[0]
} else if err != nil {
return nil, err
} else {
diff --git a/lib/controller/handler_test.go b/lib/controller/handler_test.go
index 963fd1159..746b9242f 100644
--- a/lib/controller/handler_test.go
+++ b/lib/controller/handler_test.go
@@ -130,3 +130,39 @@ func (s *HandlerSuite) TestProxyRedirect(c *check.C) {
c.Check(resp.Code, check.Equals, http.StatusFound)
c.Check(resp.Header().Get("Location"), check.Matches, `https://0.0.0.0:1/auth/joshid\?return_to=foo&?`)
}
+
+func (s *HandlerSuite) TestValidateV1APIToken(c *check.C) {
+ req := httptest.NewRequest("GET", "/arvados/v1/users/current", nil)
+ user, err := s.handler.(*Handler).validateAPItoken(req, arvadostest.ActiveToken)
+ c.Assert(err, check.IsNil)
+ c.Check(user.Authorization.UUID, check.Equals, arvadostest.ActiveTokenUUID)
+ c.Check(user.Authorization.APIToken, check.Equals, arvadostest.ActiveToken)
+ c.Check(user.Authorization.Scopes, check.DeepEquals, []string{"all"})
+ c.Check(user.UUID, check.Equals, arvadostest.ActiveUserUUID)
+}
+
+func (s *HandlerSuite) TestValidateV2APIToken(c *check.C) {
+ req := httptest.NewRequest("GET", "/arvados/v1/users/current", nil)
+ user, err := s.handler.(*Handler).validateAPItoken(req, arvadostest.ActiveTokenV2)
+ c.Assert(err, check.IsNil)
+ c.Check(user.Authorization.UUID, check.Equals, arvadostest.ActiveTokenUUID)
+ c.Check(user.Authorization.APIToken, check.Equals, arvadostest.ActiveToken)
+ c.Check(user.Authorization.Scopes, check.DeepEquals, []string{"all"})
+ c.Check(user.UUID, check.Equals, arvadostest.ActiveUserUUID)
+ c.Check(user.Authorization.TokenV2(), check.Equals, arvadostest.ActiveTokenV2)
+}
+
+func (s *HandlerSuite) TestCreateAPIToken(c *check.C) {
+ req := httptest.NewRequest("GET", "/arvados/v1/users/current", nil)
+ auth, err := s.handler.(*Handler).createAPItoken(req, arvadostest.ActiveUserUUID, nil)
+ c.Assert(err, check.IsNil)
+ c.Check(auth.Scopes, check.DeepEquals, []string{"all"})
+
+ user, err := s.handler.(*Handler).validateAPItoken(req, auth.TokenV2())
+ c.Assert(err, check.IsNil)
+ c.Check(user.Authorization.UUID, check.Equals, auth.UUID)
+ c.Check(user.Authorization.APIToken, check.Equals, auth.APIToken)
+ c.Check(user.Authorization.Scopes, check.DeepEquals, []string{"all"})
+ c.Check(user.UUID, check.Equals, arvadostest.ActiveUserUUID)
+ c.Check(user.Authorization.TokenV2(), check.Equals, auth.TokenV2())
+}
diff --git a/sdk/go/arvados/api_client_authorization.go b/sdk/go/arvados/api_client_authorization.go
index ec0239eb3..17cff235d 100644
--- a/sdk/go/arvados/api_client_authorization.go
+++ b/sdk/go/arvados/api_client_authorization.go
@@ -6,8 +6,10 @@ package arvados
// APIClientAuthorization is an arvados#apiClientAuthorization resource.
type APIClientAuthorization struct {
- UUID string `json:"uuid"`
- APIToken string `json:"api_token"`
+ UUID string `json:"uuid,omitempty"`
+ APIToken string `json:"api_token,omitempty"`
+ ExpiresAt string `json:"expires_at,omitempty"`
+ Scopes []string `json:"scopes,omitempty"`
}
// APIClientAuthorizationList is an arvados#apiClientAuthorizationList resource.
diff --git a/sdk/go/arvados/client.go b/sdk/go/arvados/client.go
index cca9f9bf1..923cecdd5 100644
--- a/sdk/go/arvados/client.go
+++ b/sdk/go/arvados/client.go
@@ -193,37 +193,62 @@ func anythingToValues(params interface{}) (url.Values, error) {
return urlValues, nil
}
-// RequestAndDecode performs an API request and unmarshals the
-// response (which must be JSON) into dst. Method and body arguments
-// are the same as for http.NewRequest(). The given path is added to
-// the server's scheme/host/port to form the request URL. The given
-// params are passed via POST form or query string.
-//
-// path must not contain a query string.
-func (c *Client) RequestAndDecode(dst interface{}, method, path string, body io.Reader, params interface{}) error {
- if body, ok := body.(io.Closer); ok {
- // Ensure body is closed even if we error out early
- defer body.Close()
- }
+func (c *Client) MakeRequest(method, path string, body io.Reader, params interface{}) (*http.Request, error) {
urlString := c.apiURL(path)
urlValues, err := anythingToValues(params)
if err != nil {
- return err
+ return nil, err
}
if (method == "GET" || body != nil) && urlValues != nil {
// FIXME: what if params don't fit in URL
u, err := url.Parse(urlString)
if err != nil {
- return err
+ return nil, err
}
u.RawQuery = urlValues.Encode()
urlString = u.String()
}
req, err := http.NewRequest(method, urlString, body)
if err != nil {
- return err
+ return nil, err
}
req.Header.Set("Content-type", "application/x-www-form-urlencoded")
+
+ if c.AuthToken != "" {
+ req.Header.Add("Authorization", "OAuth2 "+c.AuthToken)
+ }
+
+ if req.Header.Get("X-Request-Id") == "" {
+ reqid, _ := c.context().Value(contextKeyRequestID).(string)
+ if reqid == "" {
+ reqid = reqIDGen.Next()
+ }
+ if req.Header == nil {
+ req.Header = http.Header{"X-Request-Id": {reqid}}
+ } else {
+ req.Header.Set("X-Request-Id", reqid)
+ }
+ }
+
+ return req, nil
+}
+
+// RequestAndDecode performs an API request and unmarshals the
+// response (which must be JSON) into dst. Method and body arguments
+// are the same as for http.NewRequest(). The given path is added to
+// the server's scheme/host/port to form the request URL. The given
+// params are passed via POST form or query string.
+//
+// path must not contain a query string.
+func (c *Client) RequestAndDecode(dst interface{}, method, path string, body io.Reader, params interface{}) error {
+ if body, ok := body.(io.Closer); ok {
+ // Ensure body is closed even if we error out early
+ defer body.Close()
+ }
+ req, err := c.MakeRequest(method, path, body, params)
+ if err != nil {
+ return err
+ }
return c.DoAndDecode(dst, req)
}
diff --git a/sdk/go/arvadostest/fixtures.go b/sdk/go/arvadostest/fixtures.go
index 114faf17b..e0f248313 100644
--- a/sdk/go/arvadostest/fixtures.go
+++ b/sdk/go/arvadostest/fixtures.go
@@ -8,6 +8,7 @@ package arvadostest
const (
SpectatorToken = "zw2f4gwx8hw8cjre7yp6v1zylhrhn3m5gvjq73rtpwhmknrybu"
ActiveToken = "3kg6k6lzmp9kj5cpkcoxie963cmvjahbt2fod9zru30k1jqdmi"
+ ActiveTokenUUID = "zzzzz-gj3su-077z32aux8dg2s1"
ActiveTokenV2 = "v2/zzzzz-gj3su-077z32aux8dg2s1/3kg6k6lzmp9kj5cpkcoxie963cmvjahbt2fod9zru30k1jqdmi"
AdminToken = "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h"
AnonymousToken = "4kg6k6lzmp9kj4cpkcoxie964cmvjahbt4fod9zru44k4jqdmi"
diff --git a/vendor/vendor.json b/vendor/vendor.json
index aa6b2d773..9abb9bb15 100644
--- a/vendor/vendor.json
+++ b/vendor/vendor.json
@@ -313,6 +313,12 @@
"revisionTime": "2015-07-11T00:45:18Z"
},
{
+ "checksumSHA1": "khL6oKjx81rAZKW+36050b7f5As=",
+ "path": "github.com/jmcvetta/randutil",
+ "revision": "2bb1b664bcff821e02b2a0644cd29c7e824d54f8",
+ "revisionTime": "2015-08-17T12:26:01Z"
+ },
+ {
"checksumSHA1": "oX6jFQD74oOApvDIhOzW2dXpg5Q=",
"path": "github.com/kevinburke/ssh_config",
"revision": "802051befeb51da415c46972b5caf36e7c33c53d",
commit 5cb22e56e724370c51022585a25fbe665171429d
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Thu Oct 18 17:17:33 2018 -0400
14262: Refactoring, split up federation code into smaller files
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/lib/controller/fed_collections.go b/lib/controller/fed_collections.go
new file mode 100644
index 000000000..62f98367c
--- /dev/null
+++ b/lib/controller/fed_collections.go
@@ -0,0 +1,312 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package controller
+
+import (
+ "bufio"
+ "bytes"
+ "context"
+ "crypto/md5"
+ "encoding/json"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "net/http"
+ "strings"
+ "sync"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/httpserver"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+)
+
+type collectionFederatedRequestHandler struct {
+ next http.Handler
+ handler *Handler
+}
+
+func rewriteSignatures(clusterID string, expectHash string,
+ resp *http.Response, requestError error) (newResponse *http.Response, err error) {
+
+ if requestError != nil {
+ return resp, requestError
+ }
+
+ if resp.StatusCode != 200 {
+ return resp, nil
+ }
+
+ originalBody := resp.Body
+ defer originalBody.Close()
+
+ var col arvados.Collection
+ err = json.NewDecoder(resp.Body).Decode(&col)
+ if err != nil {
+ return nil, err
+ }
+
+ // rewriting signatures will make manifest text 5-10% bigger so calculate
+ // capacity accordingly
+ updatedManifest := bytes.NewBuffer(make([]byte, 0, int(float64(len(col.ManifestText))*1.1)))
+
+ hasher := md5.New()
+ mw := io.MultiWriter(hasher, updatedManifest)
+ sz := 0
+
+ scanner := bufio.NewScanner(strings.NewReader(col.ManifestText))
+ scanner.Buffer(make([]byte, 1048576), len(col.ManifestText))
+ for scanner.Scan() {
+ line := scanner.Text()
+ tokens := strings.Split(line, " ")
+ if len(tokens) < 3 {
+ return nil, fmt.Errorf("Invalid stream (<3 tokens): %q", line)
+ }
+
+ n, err := mw.Write([]byte(tokens[0]))
+ if err != nil {
+ return nil, fmt.Errorf("Error updating manifest: %v", err)
+ }
+ sz += n
+ for _, token := range tokens[1:] {
+ n, err = mw.Write([]byte(" "))
+ if err != nil {
+ return nil, fmt.Errorf("Error updating manifest: %v", err)
+ }
+ sz += n
+
+ m := keepclient.SignedLocatorRe.FindStringSubmatch(token)
+ if m != nil {
+ // Rewrite the block signature to be a remote signature
+ _, err = fmt.Fprintf(updatedManifest, "%s%s%s+R%s-%s%s", m[1], m[2], m[3], clusterID, m[5][2:], m[8])
+ if err != nil {
+ return nil, fmt.Errorf("Error updating manifest: %v", err)
+ }
+
+ // for hash checking, ignore signatures
+ n, err = fmt.Fprintf(hasher, "%s%s", m[1], m[2])
+ if err != nil {
+ return nil, fmt.Errorf("Error updating manifest: %v", err)
+ }
+ sz += n
+ } else {
+ n, err = mw.Write([]byte(token))
+ if err != nil {
+ return nil, fmt.Errorf("Error updating manifest: %v", err)
+ }
+ sz += n
+ }
+ }
+ n, err = mw.Write([]byte("\n"))
+ if err != nil {
+ return nil, fmt.Errorf("Error updating manifest: %v", err)
+ }
+ sz += n
+ }
+
+ // Check that expected hash is consistent with
+ // portable_data_hash field of the returned record
+ if expectHash == "" {
+ expectHash = col.PortableDataHash
+ } else if expectHash != col.PortableDataHash {
+ return nil, fmt.Errorf("portable_data_hash %q on returned record did not match expected hash %q ", expectHash, col.PortableDataHash)
+ }
+
+ // Certify that the computed hash of the manifest_text matches our expectation
+ sum := hasher.Sum(nil)
+ computedHash := fmt.Sprintf("%x+%v", sum, sz)
+ if computedHash != expectHash {
+ return nil, fmt.Errorf("Computed manifest_text hash %q did not match expected hash %q", computedHash, expectHash)
+ }
+
+ col.ManifestText = updatedManifest.String()
+
+ newbody, err := json.Marshal(col)
+ if err != nil {
+ return nil, err
+ }
+
+ buf := bytes.NewBuffer(newbody)
+ resp.Body = ioutil.NopCloser(buf)
+ resp.ContentLength = int64(buf.Len())
+ resp.Header.Set("Content-Length", fmt.Sprintf("%v", buf.Len()))
+
+ return resp, nil
+}
+
+func filterLocalClusterResponse(resp *http.Response, requestError error) (newResponse *http.Response, err error) {
+ if requestError != nil {
+ return resp, requestError
+ }
+
+ if resp.StatusCode == 404 {
+ // Suppress returning this result, because we want to
+ // search the federation.
+ return nil, nil
+ }
+ return resp, nil
+}
+
+type searchRemoteClusterForPDH struct {
+ pdh string
+ remoteID string
+ mtx *sync.Mutex
+ sentResponse *bool
+ sharedContext *context.Context
+ cancelFunc func()
+ errors *[]string
+ statusCode *int
+}
+
+func (s *searchRemoteClusterForPDH) filterRemoteClusterResponse(resp *http.Response, requestError error) (newResponse *http.Response, err error) {
+ s.mtx.Lock()
+ defer s.mtx.Unlock()
+
+ if *s.sentResponse {
+ // Another request already returned a response
+ return nil, nil
+ }
+
+ if requestError != nil {
+ *s.errors = append(*s.errors, fmt.Sprintf("Request error contacting %q: %v", s.remoteID, requestError))
+ // Record the error and suppress response
+ return nil, nil
+ }
+
+ if resp.StatusCode != 200 {
+ // Suppress returning unsuccessful result. Maybe
+ // another request will find it.
+ // TODO collect and return error responses.
+ *s.errors = append(*s.errors, fmt.Sprintf("Response from %q: %v", s.remoteID, resp.Status))
+ if resp.StatusCode != 404 {
+ // Got a non-404 error response, convert into BadGateway
+ *s.statusCode = http.StatusBadGateway
+ }
+ return nil, nil
+ }
+
+ s.mtx.Unlock()
+
+ // This reads the response body. We don't want to hold the
+ // lock while doing this because other remote requests could
+ // also have made it to this point, and we don't want a
+ // slow response holding the lock to block a faster response
+ // that is waiting on the lock.
+ newResponse, err = rewriteSignatures(s.remoteID, s.pdh, resp, nil)
+
+ s.mtx.Lock()
+
+ if *s.sentResponse {
+ // Another request already returned a response
+ return nil, nil
+ }
+
+ if err != nil {
+ // Suppress returning unsuccessful result. Maybe
+ // another request will be successful.
+ *s.errors = append(*s.errors, fmt.Sprintf("Error parsing response from %q: %v", s.remoteID, err))
+ return nil, nil
+ }
+
+ // We have a successful response. Suppress/cancel all the
+ // other requests/responses.
+ *s.sentResponse = true
+ s.cancelFunc()
+
+ return newResponse, nil
+}
+
+func (h *collectionFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+ if req.Method != "GET" {
+ // Only handle GET requests right now
+ h.next.ServeHTTP(w, req)
+ return
+ }
+
+ m := collectionByPDHRe.FindStringSubmatch(req.URL.Path)
+ if len(m) != 2 {
+ // Not a collection PDH GET request
+ m = collectionRe.FindStringSubmatch(req.URL.Path)
+ clusterId := ""
+
+ if len(m) > 0 {
+ clusterId = m[2]
+ }
+
+ if clusterId != "" && clusterId != h.handler.Cluster.ClusterID {
+ // request for remote collection by uuid
+ resp, err := h.handler.remoteClusterRequest(clusterId, req)
+ newResponse, err := rewriteSignatures(clusterId, "", resp, err)
+ h.handler.proxy.ForwardResponse(w, newResponse, err)
+ return
+ }
+ // not a collection UUID request, or it is a request
+ // for a local UUID, either way, continue down the
+ // handler stack.
+ h.next.ServeHTTP(w, req)
+ return
+ }
+
+ // Request for collection by PDH. Search the federation.
+
+ // First, query the local cluster.
+ resp, err := h.handler.localClusterRequest(req)
+ newResp, err := filterLocalClusterResponse(resp, err)
+ if newResp != nil || err != nil {
+ h.handler.proxy.ForwardResponse(w, newResp, err)
+ return
+ }
+
+ sharedContext, cancelFunc := context.WithCancel(req.Context())
+ defer cancelFunc()
+ req = req.WithContext(sharedContext)
+
+ // Create a goroutine for each cluster in the
+ // RemoteClusters map. The first valid result gets
+ // returned to the client. When that happens, all
+ // other outstanding requests are cancelled or
+ // suppressed.
+ sentResponse := false
+ mtx := sync.Mutex{}
+ wg := sync.WaitGroup{}
+ var errors []string
+ var errorCode int = 404
+
+ // use channel as a semaphore to limit the number of concurrent
+ // requests at a time
+ sem := make(chan bool, h.handler.Cluster.RequestLimits.GetMultiClusterRequestConcurrency())
+ defer close(sem)
+ for remoteID := range h.handler.Cluster.RemoteClusters {
+ if remoteID == h.handler.Cluster.ClusterID {
+ // No need to query local cluster again
+ continue
+ }
+ // blocks until it can put a value into the
+ // channel (which has a max queue capacity)
+ sem <- true
+ if sentResponse {
+ break
+ }
+ search := &searchRemoteClusterForPDH{m[1], remoteID, &mtx, &sentResponse,
+ &sharedContext, cancelFunc, &errors, &errorCode}
+ wg.Add(1)
+ go func() {
+ resp, err := h.handler.remoteClusterRequest(search.remoteID, req)
+ newResp, err := search.filterRemoteClusterResponse(resp, err)
+ if newResp != nil || err != nil {
+ h.handler.proxy.ForwardResponse(w, newResp, err)
+ }
+ wg.Done()
+ <-sem
+ }()
+ }
+ wg.Wait()
+
+ if sentResponse {
+ return
+ }
+
+ // No successful responses, so return the error
+ httpserver.Errors(w, errors, errorCode)
+}
diff --git a/lib/controller/fed_generic.go b/lib/controller/fed_generic.go
new file mode 100644
index 000000000..0630217b6
--- /dev/null
+++ b/lib/controller/fed_generic.go
@@ -0,0 +1,331 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package controller
+
+import (
+ "bytes"
+ "encoding/json"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+ "net/url"
+ "regexp"
+ "sync"
+
+ "git.curoverse.com/arvados.git/sdk/go/httpserver"
+)
+
+type genericFederatedRequestHandler struct {
+ next http.Handler
+ handler *Handler
+ matcher *regexp.Regexp
+}
+
+func (h *genericFederatedRequestHandler) remoteQueryUUIDs(w http.ResponseWriter,
+ req *http.Request,
+ clusterID string, uuids []string) (rp []map[string]interface{}, kind string, err error) {
+
+ found := make(map[string]bool)
+ prev_len_uuids := len(uuids) + 1
+ // Loop while
+ // (1) there are more uuids to query
+ // (2) we're making progress - on each iteration the set of
+ // uuids we are expecting for must shrink.
+ for len(uuids) > 0 && len(uuids) < prev_len_uuids {
+ var remoteReq http.Request
+ remoteReq.Header = req.Header
+ remoteReq.Method = "POST"
+ remoteReq.URL = &url.URL{Path: req.URL.Path}
+ remoteParams := make(url.Values)
+ remoteParams.Set("_method", "GET")
+ remoteParams.Set("count", "none")
+ if req.Form.Get("select") != "" {
+ remoteParams.Set("select", req.Form.Get("select"))
+ }
+ content, err := json.Marshal(uuids)
+ if err != nil {
+ return nil, "", err
+ }
+ remoteParams["filters"] = []string{fmt.Sprintf(`[["uuid", "in", %s]]`, content)}
+ enc := remoteParams.Encode()
+ remoteReq.Body = ioutil.NopCloser(bytes.NewBufferString(enc))
+
+ rc := multiClusterQueryResponseCollector{clusterID: clusterID}
+
+ var resp *http.Response
+ if clusterID == h.handler.Cluster.ClusterID {
+ resp, err = h.handler.localClusterRequest(&remoteReq)
+ } else {
+ resp, err = h.handler.remoteClusterRequest(clusterID, &remoteReq)
+ }
+ rc.collectResponse(resp, err)
+
+ if rc.error != nil {
+ return nil, "", rc.error
+ }
+
+ kind = rc.kind
+
+ if len(rc.responses) == 0 {
+ // We got zero responses, no point in doing
+ // another query.
+ return rp, kind, nil
+ }
+
+ rp = append(rp, rc.responses...)
+
+ // Go through the responses and determine what was
+ // returned. If there are remaining items, loop
+ // around and do another request with just the
+ // stragglers.
+ for _, i := range rc.responses {
+ uuid, ok := i["uuid"].(string)
+ if ok {
+ found[uuid] = true
+ }
+ }
+
+ l := []string{}
+ for _, u := range uuids {
+ if !found[u] {
+ l = append(l, u)
+ }
+ }
+ prev_len_uuids = len(uuids)
+ uuids = l
+ }
+
+ return rp, kind, nil
+}
+
+func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.ResponseWriter,
+ req *http.Request, clusterId *string) bool {
+
+ var filters [][]interface{}
+ err := json.Unmarshal([]byte(req.Form.Get("filters")), &filters)
+ if err != nil {
+ httpserver.Error(w, err.Error(), http.StatusBadRequest)
+ return true
+ }
+
+ // Split the list of uuids by prefix
+ queryClusters := make(map[string][]string)
+ expectCount := 0
+ for _, filter := range filters {
+ if len(filter) != 3 {
+ return false
+ }
+
+ if lhs, ok := filter[0].(string); !ok || lhs != "uuid" {
+ return false
+ }
+
+ op, ok := filter[1].(string)
+ if !ok {
+ return false
+ }
+
+ if op == "in" {
+ if rhs, ok := filter[2].([]interface{}); ok {
+ for _, i := range rhs {
+ if u, ok := i.(string); ok {
+ *clusterId = u[0:5]
+ queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u)
+ expectCount += 1
+ }
+ }
+ }
+ } else if op == "=" {
+ if u, ok := filter[2].(string); ok {
+ *clusterId = u[0:5]
+ queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u)
+ expectCount += 1
+ }
+ } else {
+ return false
+ }
+
+ }
+
+ if len(queryClusters) <= 1 {
+ // Query does not search for uuids across multiple
+ // clusters.
+ return false
+ }
+
+ // Validations
+ count := req.Form.Get("count")
+ if count != "" && count != `none` && count != `"none"` {
+ httpserver.Error(w, "Federated multi-object query must have 'count=none'", http.StatusBadRequest)
+ return true
+ }
+ if req.Form.Get("limit") != "" || req.Form.Get("offset") != "" || req.Form.Get("order") != "" {
+ httpserver.Error(w, "Federated multi-object may not provide 'limit', 'offset' or 'order'.", http.StatusBadRequest)
+ return true
+ }
+ if expectCount > h.handler.Cluster.RequestLimits.GetMaxItemsPerResponse() {
+ httpserver.Error(w, fmt.Sprintf("Federated multi-object request for %v objects which is more than max page size %v.",
+ expectCount, h.handler.Cluster.RequestLimits.GetMaxItemsPerResponse()), http.StatusBadRequest)
+ return true
+ }
+ if req.Form.Get("select") != "" {
+ foundUUID := false
+ var selects []string
+ err := json.Unmarshal([]byte(req.Form.Get("select")), &selects)
+ if err != nil {
+ httpserver.Error(w, err.Error(), http.StatusBadRequest)
+ return true
+ }
+
+ for _, r := range selects {
+ if r == "uuid" {
+ foundUUID = true
+ break
+ }
+ }
+ if !foundUUID {
+ httpserver.Error(w, "Federated multi-object request must include 'uuid' in 'select'", http.StatusBadRequest)
+ return true
+ }
+ }
+
+ // Perform concurrent requests to each cluster
+
+ // use channel as a semaphore to limit the number of concurrent
+ // requests at a time
+ sem := make(chan bool, h.handler.Cluster.RequestLimits.GetMultiClusterRequestConcurrency())
+ defer close(sem)
+ wg := sync.WaitGroup{}
+
+ req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
+ mtx := sync.Mutex{}
+ errors := []error{}
+ var completeResponses []map[string]interface{}
+ var kind string
+
+ for k, v := range queryClusters {
+ if len(v) == 0 {
+ // Nothing to query
+ continue
+ }
+
+ // blocks until it can put a value into the
+ // channel (which has a max queue capacity)
+ sem <- true
+ wg.Add(1)
+ go func(k string, v []string) {
+ rp, kn, err := h.remoteQueryUUIDs(w, req, k, v)
+ mtx.Lock()
+ if err == nil {
+ completeResponses = append(completeResponses, rp...)
+ kind = kn
+ } else {
+ errors = append(errors, err)
+ }
+ mtx.Unlock()
+ wg.Done()
+ <-sem
+ }(k, v)
+ }
+ wg.Wait()
+
+ if len(errors) > 0 {
+ var strerr []string
+ for _, e := range errors {
+ strerr = append(strerr, e.Error())
+ }
+ httpserver.Errors(w, strerr, http.StatusBadGateway)
+ return true
+ }
+
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(http.StatusOK)
+ itemList := make(map[string]interface{})
+ itemList["items"] = completeResponses
+ itemList["kind"] = kind
+ json.NewEncoder(w).Encode(itemList)
+
+ return true
+}
+
+func (h *genericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+ m := h.matcher.FindStringSubmatch(req.URL.Path)
+ clusterId := ""
+
+ if len(m) > 0 && m[2] != "" {
+ clusterId = m[2]
+ }
+
+ // Get form parameters from URL and form body (if POST).
+ if err := loadParamsFromForm(req); err != nil {
+ httpserver.Error(w, err.Error(), http.StatusBadRequest)
+ return
+ }
+
+ // Check if the parameters have an explicit cluster_id
+ if req.Form.Get("cluster_id") != "" {
+ clusterId = req.Form.Get("cluster_id")
+ }
+
+ // Handle the POST-as-GET special case (workaround for large
+ // GET requests that potentially exceed maximum URL length,
+ // like multi-object queries where the filter has 100s of
+ // items)
+ effectiveMethod := req.Method
+ if req.Method == "POST" && req.Form.Get("_method") != "" {
+ effectiveMethod = req.Form.Get("_method")
+ }
+
+ if effectiveMethod == "GET" &&
+ clusterId == "" &&
+ req.Form.Get("filters") != "" &&
+ h.handleMultiClusterQuery(w, req, &clusterId) {
+ return
+ }
+
+ if clusterId == "" || clusterId == h.handler.Cluster.ClusterID {
+ h.next.ServeHTTP(w, req)
+ } else {
+ resp, err := h.handler.remoteClusterRequest(clusterId, req)
+ h.handler.proxy.ForwardResponse(w, resp, err)
+ }
+}
+
+type multiClusterQueryResponseCollector struct {
+ responses []map[string]interface{}
+ error error
+ kind string
+ clusterID string
+}
+
+func (c *multiClusterQueryResponseCollector) collectResponse(resp *http.Response,
+ requestError error) (newResponse *http.Response, err error) {
+ if requestError != nil {
+ c.error = requestError
+ return nil, nil
+ }
+
+ defer resp.Body.Close()
+ var loadInto struct {
+ Kind string `json:"kind"`
+ Items []map[string]interface{} `json:"items"`
+ Errors []string `json:"errors"`
+ }
+ err = json.NewDecoder(resp.Body).Decode(&loadInto)
+
+ if err != nil {
+ c.error = fmt.Errorf("error fetching from %v (%v): %v", c.clusterID, resp.Status, err)
+ return nil, nil
+ }
+ if resp.StatusCode != http.StatusOK {
+ c.error = fmt.Errorf("error fetching from %v (%v): %v", c.clusterID, resp.Status, loadInto.Errors)
+ return nil, nil
+ }
+
+ c.responses = loadInto.Items
+ c.kind = loadInto.Kind
+
+ return nil, nil
+}
diff --git a/lib/controller/federation.go b/lib/controller/federation.go
index c5089fa23..03d2f3fab 100644
--- a/lib/controller/federation.go
+++ b/lib/controller/federation.go
@@ -5,12 +5,8 @@
package controller
import (
- "bufio"
"bytes"
- "context"
- "crypto/md5"
"database/sql"
- "encoding/json"
"fmt"
"io"
"io/ioutil"
@@ -18,12 +14,9 @@ import (
"net/url"
"regexp"
"strings"
- "sync"
"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/auth"
- "git.curoverse.com/arvados.git/sdk/go/httpserver"
- "git.curoverse.com/arvados.git/sdk/go/keepclient"
)
var pathPattern = `^/arvados/v1/%s(/([0-9a-z]{5})-%s-[0-9a-z]{15})?(.*)$`
@@ -33,17 +26,6 @@ var containerRequestsRe = regexp.MustCompile(fmt.Sprintf(pathPattern, "container
var collectionRe = regexp.MustCompile(fmt.Sprintf(pathPattern, "collections", "4zz18"))
var collectionByPDHRe = regexp.MustCompile(`^/arvados/v1/collections/([0-9a-fA-F]{32}\+[0-9]+)+$`)
-type genericFederatedRequestHandler struct {
- next http.Handler
- handler *Handler
- matcher *regexp.Regexp
-}
-
-type collectionFederatedRequestHandler struct {
- next http.Handler
- handler *Handler
-}
-
func (h *Handler) remoteClusterRequest(remoteID string, req *http.Request) (*http.Response, error) {
remote, ok := h.Cluster.RemoteClusters[remoteID]
if !ok {
@@ -98,597 +80,6 @@ func loadParamsFromForm(req *http.Request) error {
return nil
}
-type multiClusterQueryResponseCollector struct {
- responses []map[string]interface{}
- error error
- kind string
- clusterID string
-}
-
-func (c *multiClusterQueryResponseCollector) collectResponse(resp *http.Response,
- requestError error) (newResponse *http.Response, err error) {
- if requestError != nil {
- c.error = requestError
- return nil, nil
- }
-
- defer resp.Body.Close()
- var loadInto struct {
- Kind string `json:"kind"`
- Items []map[string]interface{} `json:"items"`
- Errors []string `json:"errors"`
- }
- err = json.NewDecoder(resp.Body).Decode(&loadInto)
-
- if err != nil {
- c.error = fmt.Errorf("error fetching from %v (%v): %v", c.clusterID, resp.Status, err)
- return nil, nil
- }
- if resp.StatusCode != http.StatusOK {
- c.error = fmt.Errorf("error fetching from %v (%v): %v", c.clusterID, resp.Status, loadInto.Errors)
- return nil, nil
- }
-
- c.responses = loadInto.Items
- c.kind = loadInto.Kind
-
- return nil, nil
-}
-
-func (h *genericFederatedRequestHandler) remoteQueryUUIDs(w http.ResponseWriter,
- req *http.Request,
- clusterID string, uuids []string) (rp []map[string]interface{}, kind string, err error) {
-
- found := make(map[string]bool)
- prev_len_uuids := len(uuids) + 1
- // Loop while
- // (1) there are more uuids to query
- // (2) we're making progress - on each iteration the set of
- // uuids we are expecting for must shrink.
- for len(uuids) > 0 && len(uuids) < prev_len_uuids {
- var remoteReq http.Request
- remoteReq.Header = req.Header
- remoteReq.Method = "POST"
- remoteReq.URL = &url.URL{Path: req.URL.Path}
- remoteParams := make(url.Values)
- remoteParams.Set("_method", "GET")
- remoteParams.Set("count", "none")
- if req.Form.Get("select") != "" {
- remoteParams.Set("select", req.Form.Get("select"))
- }
- content, err := json.Marshal(uuids)
- if err != nil {
- return nil, "", err
- }
- remoteParams["filters"] = []string{fmt.Sprintf(`[["uuid", "in", %s]]`, content)}
- enc := remoteParams.Encode()
- remoteReq.Body = ioutil.NopCloser(bytes.NewBufferString(enc))
-
- rc := multiClusterQueryResponseCollector{clusterID: clusterID}
-
- var resp *http.Response
- if clusterID == h.handler.Cluster.ClusterID {
- resp, err = h.handler.localClusterRequest(&remoteReq)
- } else {
- resp, err = h.handler.remoteClusterRequest(clusterID, &remoteReq)
- }
- rc.collectResponse(resp, err)
-
- if rc.error != nil {
- return nil, "", rc.error
- }
-
- kind = rc.kind
-
- if len(rc.responses) == 0 {
- // We got zero responses, no point in doing
- // another query.
- return rp, kind, nil
- }
-
- rp = append(rp, rc.responses...)
-
- // Go through the responses and determine what was
- // returned. If there are remaining items, loop
- // around and do another request with just the
- // stragglers.
- for _, i := range rc.responses {
- uuid, ok := i["uuid"].(string)
- if ok {
- found[uuid] = true
- }
- }
-
- l := []string{}
- for _, u := range uuids {
- if !found[u] {
- l = append(l, u)
- }
- }
- prev_len_uuids = len(uuids)
- uuids = l
- }
-
- return rp, kind, nil
-}
-
-func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.ResponseWriter,
- req *http.Request, clusterId *string) bool {
-
- var filters [][]interface{}
- err := json.Unmarshal([]byte(req.Form.Get("filters")), &filters)
- if err != nil {
- httpserver.Error(w, err.Error(), http.StatusBadRequest)
- return true
- }
-
- // Split the list of uuids by prefix
- queryClusters := make(map[string][]string)
- expectCount := 0
- for _, filter := range filters {
- if len(filter) != 3 {
- return false
- }
-
- if lhs, ok := filter[0].(string); !ok || lhs != "uuid" {
- return false
- }
-
- op, ok := filter[1].(string)
- if !ok {
- return false
- }
-
- if op == "in" {
- if rhs, ok := filter[2].([]interface{}); ok {
- for _, i := range rhs {
- if u, ok := i.(string); ok {
- *clusterId = u[0:5]
- queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u)
- expectCount += 1
- }
- }
- }
- } else if op == "=" {
- if u, ok := filter[2].(string); ok {
- *clusterId = u[0:5]
- queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u)
- expectCount += 1
- }
- } else {
- return false
- }
-
- }
-
- if len(queryClusters) <= 1 {
- // Query does not search for uuids across multiple
- // clusters.
- return false
- }
-
- // Validations
- count := req.Form.Get("count")
- if count != "" && count != `none` && count != `"none"` {
- httpserver.Error(w, "Federated multi-object query must have 'count=none'", http.StatusBadRequest)
- return true
- }
- if req.Form.Get("limit") != "" || req.Form.Get("offset") != "" || req.Form.Get("order") != "" {
- httpserver.Error(w, "Federated multi-object may not provide 'limit', 'offset' or 'order'.", http.StatusBadRequest)
- return true
- }
- if expectCount > h.handler.Cluster.RequestLimits.GetMaxItemsPerResponse() {
- httpserver.Error(w, fmt.Sprintf("Federated multi-object request for %v objects which is more than max page size %v.",
- expectCount, h.handler.Cluster.RequestLimits.GetMaxItemsPerResponse()), http.StatusBadRequest)
- return true
- }
- if req.Form.Get("select") != "" {
- foundUUID := false
- var selects []string
- err := json.Unmarshal([]byte(req.Form.Get("select")), &selects)
- if err != nil {
- httpserver.Error(w, err.Error(), http.StatusBadRequest)
- return true
- }
-
- for _, r := range selects {
- if r == "uuid" {
- foundUUID = true
- break
- }
- }
- if !foundUUID {
- httpserver.Error(w, "Federated multi-object request must include 'uuid' in 'select'", http.StatusBadRequest)
- return true
- }
- }
-
- // Perform concurrent requests to each cluster
-
- // use channel as a semaphore to limit the number of concurrent
- // requests at a time
- sem := make(chan bool, h.handler.Cluster.RequestLimits.GetMultiClusterRequestConcurrency())
- defer close(sem)
- wg := sync.WaitGroup{}
-
- req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
- mtx := sync.Mutex{}
- errors := []error{}
- var completeResponses []map[string]interface{}
- var kind string
-
- for k, v := range queryClusters {
- if len(v) == 0 {
- // Nothing to query
- continue
- }
-
- // blocks until it can put a value into the
- // channel (which has a max queue capacity)
- sem <- true
- wg.Add(1)
- go func(k string, v []string) {
- rp, kn, err := h.remoteQueryUUIDs(w, req, k, v)
- mtx.Lock()
- if err == nil {
- completeResponses = append(completeResponses, rp...)
- kind = kn
- } else {
- errors = append(errors, err)
- }
- mtx.Unlock()
- wg.Done()
- <-sem
- }(k, v)
- }
- wg.Wait()
-
- if len(errors) > 0 {
- var strerr []string
- for _, e := range errors {
- strerr = append(strerr, e.Error())
- }
- httpserver.Errors(w, strerr, http.StatusBadGateway)
- return true
- }
-
- w.Header().Set("Content-Type", "application/json")
- w.WriteHeader(http.StatusOK)
- itemList := make(map[string]interface{})
- itemList["items"] = completeResponses
- itemList["kind"] = kind
- json.NewEncoder(w).Encode(itemList)
-
- return true
-}
-
-func (h *genericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
- m := h.matcher.FindStringSubmatch(req.URL.Path)
- clusterId := ""
-
- if len(m) > 0 && m[2] != "" {
- clusterId = m[2]
- }
-
- // Get form parameters from URL and form body (if POST).
- if err := loadParamsFromForm(req); err != nil {
- httpserver.Error(w, err.Error(), http.StatusBadRequest)
- return
- }
-
- // Check if the parameters have an explicit cluster_id
- if req.Form.Get("cluster_id") != "" {
- clusterId = req.Form.Get("cluster_id")
- }
-
- // Handle the POST-as-GET special case (workaround for large
- // GET requests that potentially exceed maximum URL length,
- // like multi-object queries where the filter has 100s of
- // items)
- effectiveMethod := req.Method
- if req.Method == "POST" && req.Form.Get("_method") != "" {
- effectiveMethod = req.Form.Get("_method")
- }
-
- if effectiveMethod == "GET" &&
- clusterId == "" &&
- req.Form.Get("filters") != "" &&
- h.handleMultiClusterQuery(w, req, &clusterId) {
- return
- }
-
- if clusterId == "" || clusterId == h.handler.Cluster.ClusterID {
- h.next.ServeHTTP(w, req)
- } else {
- resp, err := h.handler.remoteClusterRequest(clusterId, req)
- h.handler.proxy.ForwardResponse(w, resp, err)
- }
-}
-
-func rewriteSignatures(clusterID string, expectHash string,
- resp *http.Response, requestError error) (newResponse *http.Response, err error) {
-
- if requestError != nil {
- return resp, requestError
- }
-
- if resp.StatusCode != 200 {
- return resp, nil
- }
-
- originalBody := resp.Body
- defer originalBody.Close()
-
- var col arvados.Collection
- err = json.NewDecoder(resp.Body).Decode(&col)
- if err != nil {
- return nil, err
- }
-
- // rewriting signatures will make manifest text 5-10% bigger so calculate
- // capacity accordingly
- updatedManifest := bytes.NewBuffer(make([]byte, 0, int(float64(len(col.ManifestText))*1.1)))
-
- hasher := md5.New()
- mw := io.MultiWriter(hasher, updatedManifest)
- sz := 0
-
- scanner := bufio.NewScanner(strings.NewReader(col.ManifestText))
- scanner.Buffer(make([]byte, 1048576), len(col.ManifestText))
- for scanner.Scan() {
- line := scanner.Text()
- tokens := strings.Split(line, " ")
- if len(tokens) < 3 {
- return nil, fmt.Errorf("Invalid stream (<3 tokens): %q", line)
- }
-
- n, err := mw.Write([]byte(tokens[0]))
- if err != nil {
- return nil, fmt.Errorf("Error updating manifest: %v", err)
- }
- sz += n
- for _, token := range tokens[1:] {
- n, err = mw.Write([]byte(" "))
- if err != nil {
- return nil, fmt.Errorf("Error updating manifest: %v", err)
- }
- sz += n
-
- m := keepclient.SignedLocatorRe.FindStringSubmatch(token)
- if m != nil {
- // Rewrite the block signature to be a remote signature
- _, err = fmt.Fprintf(updatedManifest, "%s%s%s+R%s-%s%s", m[1], m[2], m[3], clusterID, m[5][2:], m[8])
- if err != nil {
- return nil, fmt.Errorf("Error updating manifest: %v", err)
- }
-
- // for hash checking, ignore signatures
- n, err = fmt.Fprintf(hasher, "%s%s", m[1], m[2])
- if err != nil {
- return nil, fmt.Errorf("Error updating manifest: %v", err)
- }
- sz += n
- } else {
- n, err = mw.Write([]byte(token))
- if err != nil {
- return nil, fmt.Errorf("Error updating manifest: %v", err)
- }
- sz += n
- }
- }
- n, err = mw.Write([]byte("\n"))
- if err != nil {
- return nil, fmt.Errorf("Error updating manifest: %v", err)
- }
- sz += n
- }
-
- // Check that expected hash is consistent with
- // portable_data_hash field of the returned record
- if expectHash == "" {
- expectHash = col.PortableDataHash
- } else if expectHash != col.PortableDataHash {
- return nil, fmt.Errorf("portable_data_hash %q on returned record did not match expected hash %q ", expectHash, col.PortableDataHash)
- }
-
- // Certify that the computed hash of the manifest_text matches our expectation
- sum := hasher.Sum(nil)
- computedHash := fmt.Sprintf("%x+%v", sum, sz)
- if computedHash != expectHash {
- return nil, fmt.Errorf("Computed manifest_text hash %q did not match expected hash %q", computedHash, expectHash)
- }
-
- col.ManifestText = updatedManifest.String()
-
- newbody, err := json.Marshal(col)
- if err != nil {
- return nil, err
- }
-
- buf := bytes.NewBuffer(newbody)
- resp.Body = ioutil.NopCloser(buf)
- resp.ContentLength = int64(buf.Len())
- resp.Header.Set("Content-Length", fmt.Sprintf("%v", buf.Len()))
-
- return resp, nil
-}
-
-func filterLocalClusterResponse(resp *http.Response, requestError error) (newResponse *http.Response, err error) {
- if requestError != nil {
- return resp, requestError
- }
-
- if resp.StatusCode == 404 {
- // Suppress returning this result, because we want to
- // search the federation.
- return nil, nil
- }
- return resp, nil
-}
-
-type searchRemoteClusterForPDH struct {
- pdh string
- remoteID string
- mtx *sync.Mutex
- sentResponse *bool
- sharedContext *context.Context
- cancelFunc func()
- errors *[]string
- statusCode *int
-}
-
-func (s *searchRemoteClusterForPDH) filterRemoteClusterResponse(resp *http.Response, requestError error) (newResponse *http.Response, err error) {
- s.mtx.Lock()
- defer s.mtx.Unlock()
-
- if *s.sentResponse {
- // Another request already returned a response
- return nil, nil
- }
-
- if requestError != nil {
- *s.errors = append(*s.errors, fmt.Sprintf("Request error contacting %q: %v", s.remoteID, requestError))
- // Record the error and suppress response
- return nil, nil
- }
-
- if resp.StatusCode != 200 {
- // Suppress returning unsuccessful result. Maybe
- // another request will find it.
- // TODO collect and return error responses.
- *s.errors = append(*s.errors, fmt.Sprintf("Response from %q: %v", s.remoteID, resp.Status))
- if resp.StatusCode != 404 {
- // Got a non-404 error response, convert into BadGateway
- *s.statusCode = http.StatusBadGateway
- }
- return nil, nil
- }
-
- s.mtx.Unlock()
-
- // This reads the response body. We don't want to hold the
- // lock while doing this because other remote requests could
- // also have made it to this point, and we don't want a
- // slow response holding the lock to block a faster response
- // that is waiting on the lock.
- newResponse, err = rewriteSignatures(s.remoteID, s.pdh, resp, nil)
-
- s.mtx.Lock()
-
- if *s.sentResponse {
- // Another request already returned a response
- return nil, nil
- }
-
- if err != nil {
- // Suppress returning unsuccessful result. Maybe
- // another request will be successful.
- *s.errors = append(*s.errors, fmt.Sprintf("Error parsing response from %q: %v", s.remoteID, err))
- return nil, nil
- }
-
- // We have a successful response. Suppress/cancel all the
- // other requests/responses.
- *s.sentResponse = true
- s.cancelFunc()
-
- return newResponse, nil
-}
-
-func (h *collectionFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
- if req.Method != "GET" {
- // Only handle GET requests right now
- h.next.ServeHTTP(w, req)
- return
- }
-
- m := collectionByPDHRe.FindStringSubmatch(req.URL.Path)
- if len(m) != 2 {
- // Not a collection PDH GET request
- m = collectionRe.FindStringSubmatch(req.URL.Path)
- clusterId := ""
-
- if len(m) > 0 {
- clusterId = m[2]
- }
-
- if clusterId != "" && clusterId != h.handler.Cluster.ClusterID {
- // request for remote collection by uuid
- resp, err := h.handler.remoteClusterRequest(clusterId, req)
- newResponse, err := rewriteSignatures(clusterId, "", resp, err)
- h.handler.proxy.ForwardResponse(w, newResponse, err)
- return
- }
- // not a collection UUID request, or it is a request
- // for a local UUID, either way, continue down the
- // handler stack.
- h.next.ServeHTTP(w, req)
- return
- }
-
- // Request for collection by PDH. Search the federation.
-
- // First, query the local cluster.
- resp, err := h.handler.localClusterRequest(req)
- newResp, err := filterLocalClusterResponse(resp, err)
- if newResp != nil || err != nil {
- h.handler.proxy.ForwardResponse(w, newResp, err)
- return
- }
-
- sharedContext, cancelFunc := context.WithCancel(req.Context())
- defer cancelFunc()
- req = req.WithContext(sharedContext)
-
- // Create a goroutine for each cluster in the
- // RemoteClusters map. The first valid result gets
- // returned to the client. When that happens, all
- // other outstanding requests are cancelled or
- // suppressed.
- sentResponse := false
- mtx := sync.Mutex{}
- wg := sync.WaitGroup{}
- var errors []string
- var errorCode int = 404
-
- // use channel as a semaphore to limit the number of concurrent
- // requests at a time
- sem := make(chan bool, h.handler.Cluster.RequestLimits.GetMultiClusterRequestConcurrency())
- defer close(sem)
- for remoteID := range h.handler.Cluster.RemoteClusters {
- if remoteID == h.handler.Cluster.ClusterID {
- // No need to query local cluster again
- continue
- }
- // blocks until it can put a value into the
- // channel (which has a max queue capacity)
- sem <- true
- if sentResponse {
- break
- }
- search := &searchRemoteClusterForPDH{m[1], remoteID, &mtx, &sentResponse,
- &sharedContext, cancelFunc, &errors, &errorCode}
- wg.Add(1)
- go func() {
- resp, err := h.handler.remoteClusterRequest(search.remoteID, req)
- newResp, err := search.filterRemoteClusterResponse(resp, err)
- if newResp != nil || err != nil {
- h.handler.proxy.ForwardResponse(w, newResp, err)
- }
- wg.Done()
- <-sem
- }()
- }
- wg.Wait()
-
- if sentResponse {
- return
- }
-
- // No successful responses, so return the error
- httpserver.Errors(w, errors, errorCode)
-}
-
func (h *Handler) setupProxyRemoteCluster(next http.Handler) http.Handler {
mux := http.NewServeMux()
mux.Handle("/arvados/v1/workflows", &genericFederatedRequestHandler{next, h, wfRe})
commit 24d191a32b067828515459f5f9fa4f49300b543e
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Thu Oct 18 16:08:28 2018 -0400
14262: Refactoring proxy
Split proxy.Do() into ForwardRequest() and ForwardResponse().
Inversion of control eliminates need for "filter" callback, since the
caller can now modify the response in between the calls to
ForwardRequest() and ForwardResponse().
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/lib/controller/federation.go b/lib/controller/federation.go
index f30365574..c5089fa23 100644
--- a/lib/controller/federation.go
+++ b/lib/controller/federation.go
@@ -44,17 +44,10 @@ type collectionFederatedRequestHandler struct {
handler *Handler
}
-func (h *Handler) remoteClusterRequest(remoteID string, w http.ResponseWriter, req *http.Request, filter ResponseFilter) {
+func (h *Handler) remoteClusterRequest(remoteID string, req *http.Request) (*http.Response, error) {
remote, ok := h.Cluster.RemoteClusters[remoteID]
if !ok {
- err := fmt.Errorf("no proxy available for cluster %v", remoteID)
- if filter != nil {
- _, err = filter(nil, err)
- }
- if err != nil {
- httpserver.Error(w, err.Error(), http.StatusNotFound)
- }
- return
+ return nil, HTTPError{fmt.Sprintf("no proxy available for cluster %v", remoteID), http.StatusNotFound}
}
scheme := remote.Scheme
if scheme == "" {
@@ -62,13 +55,7 @@ func (h *Handler) remoteClusterRequest(remoteID string, w http.ResponseWriter, r
}
saltedReq, err := h.saltAuthToken(req, remoteID)
if err != nil {
- if filter != nil {
- _, err = filter(nil, err)
- }
- if err != nil {
- httpserver.Error(w, err.Error(), http.StatusBadRequest)
- }
- return
+ return nil, err
}
urlOut := &url.URL{
Scheme: scheme,
@@ -81,7 +68,7 @@ func (h *Handler) remoteClusterRequest(remoteID string, w http.ResponseWriter, r
if remote.Insecure {
client = h.insecureClient
}
- h.proxy.Do(w, saltedReq, urlOut, client, filter)
+ return h.proxy.ForwardRequest(saltedReq, urlOut, client)
}
// Buffer request body, parse form parameters in request, and then
@@ -179,13 +166,14 @@ func (h *genericFederatedRequestHandler) remoteQueryUUIDs(w http.ResponseWriter,
rc := multiClusterQueryResponseCollector{clusterID: clusterID}
+ var resp *http.Response
if clusterID == h.handler.Cluster.ClusterID {
- h.handler.localClusterRequest(w, &remoteReq,
- rc.collectResponse)
+ resp, err = h.handler.localClusterRequest(&remoteReq)
} else {
- h.handler.remoteClusterRequest(clusterID, w, &remoteReq,
- rc.collectResponse)
+ resp, err = h.handler.remoteClusterRequest(clusterID, &remoteReq)
}
+ rc.collectResponse(resp, err)
+
if rc.error != nil {
return nil, "", rc.error
}
@@ -412,16 +400,14 @@ func (h *genericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *h
if clusterId == "" || clusterId == h.handler.Cluster.ClusterID {
h.next.ServeHTTP(w, req)
} else {
- h.handler.remoteClusterRequest(clusterId, w, req, nil)
+ resp, err := h.handler.remoteClusterRequest(clusterId, req)
+ h.handler.proxy.ForwardResponse(w, resp, err)
}
}
-type rewriteSignaturesClusterId struct {
- clusterID string
- expectHash string
-}
+func rewriteSignatures(clusterID string, expectHash string,
+ resp *http.Response, requestError error) (newResponse *http.Response, err error) {
-func (rw rewriteSignaturesClusterId) rewriteSignatures(resp *http.Response, requestError error) (newResponse *http.Response, err error) {
if requestError != nil {
return resp, requestError
}
@@ -471,7 +457,7 @@ func (rw rewriteSignaturesClusterId) rewriteSignatures(resp *http.Response, requ
m := keepclient.SignedLocatorRe.FindStringSubmatch(token)
if m != nil {
// Rewrite the block signature to be a remote signature
- _, err = fmt.Fprintf(updatedManifest, "%s%s%s+R%s-%s%s", m[1], m[2], m[3], rw.clusterID, m[5][2:], m[8])
+ _, err = fmt.Fprintf(updatedManifest, "%s%s%s+R%s-%s%s", m[1], m[2], m[3], clusterID, m[5][2:], m[8])
if err != nil {
return nil, fmt.Errorf("Error updating manifest: %v", err)
}
@@ -499,17 +485,17 @@ func (rw rewriteSignaturesClusterId) rewriteSignatures(resp *http.Response, requ
// Check that expected hash is consistent with
// portable_data_hash field of the returned record
- if rw.expectHash == "" {
- rw.expectHash = col.PortableDataHash
- } else if rw.expectHash != col.PortableDataHash {
- return nil, fmt.Errorf("portable_data_hash %q on returned record did not match expected hash %q ", rw.expectHash, col.PortableDataHash)
+ if expectHash == "" {
+ expectHash = col.PortableDataHash
+ } else if expectHash != col.PortableDataHash {
+ return nil, fmt.Errorf("portable_data_hash %q on returned record did not match expected hash %q ", expectHash, col.PortableDataHash)
}
// Certify that the computed hash of the manifest_text matches our expectation
sum := hasher.Sum(nil)
computedHash := fmt.Sprintf("%x+%v", sum, sz)
- if computedHash != rw.expectHash {
- return nil, fmt.Errorf("Computed manifest_text hash %q did not match expected hash %q", computedHash, rw.expectHash)
+ if computedHash != expectHash {
+ return nil, fmt.Errorf("Computed manifest_text hash %q did not match expected hash %q", computedHash, expectHash)
}
col.ManifestText = updatedManifest.String()
@@ -585,7 +571,7 @@ func (s *searchRemoteClusterForPDH) filterRemoteClusterResponse(resp *http.Respo
// also have made it to this point, and we don't want a
// slow response holding the lock to block a faster response
// that is waiting on the lock.
- newResponse, err = rewriteSignaturesClusterId{s.remoteID, s.pdh}.rewriteSignatures(resp, nil)
+ newResponse, err = rewriteSignatures(s.remoteID, s.pdh, resp, nil)
s.mtx.Lock()
@@ -628,8 +614,9 @@ func (h *collectionFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req
if clusterId != "" && clusterId != h.handler.Cluster.ClusterID {
// request for remote collection by uuid
- h.handler.remoteClusterRequest(clusterId, w, req,
- rewriteSignaturesClusterId{clusterId, ""}.rewriteSignatures)
+ resp, err := h.handler.remoteClusterRequest(clusterId, req)
+ newResponse, err := rewriteSignatures(clusterId, "", resp, err)
+ h.handler.proxy.ForwardResponse(w, newResponse, err)
return
}
// not a collection UUID request, or it is a request
@@ -642,7 +629,10 @@ func (h *collectionFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req
// Request for collection by PDH. Search the federation.
// First, query the local cluster.
- if h.handler.localClusterRequest(w, req, filterLocalClusterResponse) {
+ resp, err := h.handler.localClusterRequest(req)
+ newResp, err := filterLocalClusterResponse(resp, err)
+ if newResp != nil || err != nil {
+ h.handler.proxy.ForwardResponse(w, newResp, err)
return
}
@@ -680,7 +670,11 @@ func (h *collectionFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req
&sharedContext, cancelFunc, &errors, &errorCode}
wg.Add(1)
go func() {
- h.handler.remoteClusterRequest(search.remoteID, w, req, search.filterRemoteClusterResponse)
+ resp, err := h.handler.remoteClusterRequest(search.remoteID, req)
+ newResp, err := search.filterRemoteClusterResponse(resp, err)
+ if newResp != nil || err != nil {
+ h.handler.proxy.ForwardResponse(w, newResp, err)
+ }
wg.Done()
<-sem
}()
diff --git a/lib/controller/handler.go b/lib/controller/handler.go
index 0c31815cb..5e9012949 100644
--- a/lib/controller/handler.go
+++ b/lib/controller/handler.go
@@ -121,14 +121,10 @@ func prepend(next http.Handler, middleware middlewareFunc) http.Handler {
})
}
-// localClusterRequest sets up a request so it can be proxied to the
-// local API server using proxy.Do(). Returns true if a response was
-// written, false if not.
-func (h *Handler) localClusterRequest(w http.ResponseWriter, req *http.Request, filter ResponseFilter) bool {
+func (h *Handler) localClusterRequest(req *http.Request) (*http.Response, error) {
urlOut, insecure, err := findRailsAPI(h.Cluster, h.NodeProfile)
if err != nil {
- httpserver.Error(w, err.Error(), http.StatusInternalServerError)
- return true
+ return nil, err
}
urlOut = &url.URL{
Scheme: urlOut.Scheme,
@@ -141,12 +137,14 @@ func (h *Handler) localClusterRequest(w http.ResponseWriter, req *http.Request,
if insecure {
client = h.insecureClient
}
- return h.proxy.Do(w, req, urlOut, client, filter)
+ return h.proxy.ForwardRequest(req, urlOut, client)
}
func (h *Handler) proxyRailsAPI(w http.ResponseWriter, req *http.Request, next http.Handler) {
- if !h.localClusterRequest(w, req, nil) && next != nil {
- next.ServeHTTP(w, req)
+ resp, err := h.localClusterRequest(req)
+ n, err := h.proxy.ForwardResponse(w, resp, err)
+ if err != nil {
+ httpserver.Logger(req).WithError(err).WithField("bytesCopied", n).Error("error copying response body")
}
}
diff --git a/lib/controller/proxy.go b/lib/controller/proxy.go
index 951cb9d25..9aecdc1b2 100644
--- a/lib/controller/proxy.go
+++ b/lib/controller/proxy.go
@@ -19,6 +19,15 @@ type proxy struct {
RequestTimeout time.Duration
}
+type HTTPError struct {
+ Message string
+ Code int
+}
+
+func (h HTTPError) Error() string {
+ return h.Message
+}
+
// headers that shouldn't be forwarded when proxying. See
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers
var dropHeaders = map[string]bool{
@@ -36,15 +45,11 @@ var dropHeaders = map[string]bool{
type ResponseFilter func(*http.Response, error) (*http.Response, error)
-// Do sends a request, passes the result to the filter (if provided)
-// and then if the result is not suppressed by the filter, sends the
-// request to the ResponseWriter. Returns true if a response was written,
-// false if not.
-func (p *proxy) Do(w http.ResponseWriter,
+// Forward a request to downstream service, and return response or error.
+func (p *proxy) ForwardRequest(
reqIn *http.Request,
urlOut *url.URL,
- client *http.Client,
- filter ResponseFilter) bool {
+ client *http.Client) (*http.Response, error) {
// Copy headers from incoming request, then add/replace proxy
// headers like Via and X-Forwarded-For.
@@ -79,50 +84,26 @@ func (p *proxy) Do(w http.ResponseWriter,
Body: reqIn.Body,
}).WithContext(ctx)
- resp, err := client.Do(reqOut)
- if filter == nil && err != nil {
- httpserver.Error(w, err.Error(), http.StatusBadGateway)
- return true
- }
-
- // make sure original response body gets closed
- var originalBody io.ReadCloser
- if resp != nil {
- originalBody = resp.Body
- if originalBody != nil {
- defer originalBody.Close()
- }
- }
-
- if filter != nil {
- resp, err = filter(resp, err)
+ return client.Do(reqOut)
+}
- if err != nil {
+// Copy a response (or error) to the upstream client
+func (p *proxy) ForwardResponse(w http.ResponseWriter, resp *http.Response, err error) (int64, error) {
+ if err != nil {
+ if he, ok := err.(HTTPError); ok {
+ httpserver.Error(w, he.Message, he.Code)
+ } else {
httpserver.Error(w, err.Error(), http.StatusBadGateway)
- return true
- }
- if resp == nil {
- // filter() returned a nil response, this means suppress
- // writing a response, for the case where there might
- // be multiple response writers.
- return false
- }
-
- // the filter gave us a new response body, make sure that gets closed too.
- if resp.Body != originalBody {
- defer resp.Body.Close()
}
+ return 0, nil
}
+ defer resp.Body.Close()
for k, v := range resp.Header {
for _, v := range v {
w.Header().Add(k, v)
}
}
w.WriteHeader(resp.StatusCode)
- n, err := io.Copy(w, resp.Body)
- if err != nil {
- httpserver.Logger(reqIn).WithError(err).WithField("bytesCopied", n).Error("error copying response body")
- }
- return true
+ return io.Copy(w, resp.Body)
}
commit 47b74a9963a3910dede189378c5b387bf92e81d2
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Thu Oct 18 14:34:49 2018 -0400
14262: Fix bug moving api_token to header
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/lib/controller/federation.go b/lib/controller/federation.go
index e5c56bd83..f30365574 100644
--- a/lib/controller/federation.go
+++ b/lib/controller/federation.go
@@ -14,7 +14,6 @@ import (
"fmt"
"io"
"io/ioutil"
- "log"
"net/http"
"net/url"
"regexp"
@@ -61,7 +60,7 @@ func (h *Handler) remoteClusterRequest(remoteID string, w http.ResponseWriter, r
if scheme == "" {
scheme = "https"
}
- req, err := h.saltAuthToken(req, remoteID)
+ saltedReq, err := h.saltAuthToken(req, remoteID)
if err != nil {
if filter != nil {
_, err = filter(nil, err)
@@ -74,15 +73,15 @@ func (h *Handler) remoteClusterRequest(remoteID string, w http.ResponseWriter, r
urlOut := &url.URL{
Scheme: scheme,
Host: remote.Host,
- Path: req.URL.Path,
- RawPath: req.URL.RawPath,
- RawQuery: req.URL.RawQuery,
+ Path: saltedReq.URL.Path,
+ RawPath: saltedReq.URL.RawPath,
+ RawQuery: saltedReq.URL.RawQuery,
}
client := h.secureClient
if remote.Insecure {
client = h.insecureClient
}
- h.proxy.Do(w, req, urlOut, client, filter)
+ h.proxy.Do(w, saltedReq, urlOut, client, filter)
}
// Buffer request body, parse form parameters in request, and then
@@ -777,7 +776,6 @@ func (h *Handler) saltAuthToken(req *http.Request, remote string) (updatedReq *h
token, err := auth.SaltToken(creds.Tokens[0], remote)
- log.Printf("Salting %q %q to get %q %q", creds.Tokens[0], remote, token, err)
if err == auth.ErrObsoleteToken {
// If the token exists in our own database, salt it
// for the remote. Otherwise, assume it was issued by
@@ -801,14 +799,11 @@ func (h *Handler) saltAuthToken(req *http.Request, remote string) (updatedReq *h
}
updatedReq.Header = http.Header{}
for k, v := range req.Header {
- if k == "Authorization" {
- updatedReq.Header[k] = []string{"Bearer " + token}
- } else {
+ if k != "Authorization" {
updatedReq.Header[k] = v
}
}
-
- log.Printf("Salted %q %q to get %q", creds.Tokens[0], remote, token)
+ updatedReq.Header.Set("Authorization", "Bearer "+token)
// Remove api_token=... from the the query string, in case we
// end up forwarding the request.
commit ef1b4fb55cbc0baabe73b526a4f1a05cefef0c38
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Tue Oct 16 15:43:12 2018 -0400
14262: Use container token for access to load Docker image
Previously used Dispatcher token, which created a security race
condition (you couldn't set a container image that you didn't have
access to, but if your access was revoked it the meantime, the
container would still run.)
Also tweaked API server to allow a PDH for the container image spec
with no further checking (so the API server doesn't have to go out and
search the federation.) This is no longer a security hazard since it
is now using a user token and not the dispatcher token.
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/services/api/app/models/container.rb b/services/api/app/models/container.rb
index 0d8453174..7d8cc00f2 100644
--- a/services/api/app/models/container.rb
+++ b/services/api/app/models/container.rb
@@ -248,6 +248,12 @@ class Container < ArvadosModel
def self.resolve_container_image(container_image)
coll = Collection.for_latest_docker_image(container_image)
if !coll
+ # Allow bare pdh without any additional checking otherwise
+ # federated container requests won't work.
+ if loc = Keep::Locator.parse(container_image)
+ loc.strip_hints!
+ return loc.to_s
+ end
raise ArvadosModel::UnresolvableContainerError.new "docker image #{container_image.inspect} not found"
end
coll.portable_data_hash
diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index 27136b452..d055106d3 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -122,7 +122,7 @@ type ContainerRunner struct {
SigChan chan os.Signal
ArvMountExit chan error
SecretMounts map[string]arvados.Mount
- MkArvClient func(token string) (IArvadosClient, error)
+ MkArvClient func(token string) (IArvadosClient, IKeepClient, error)
finalState string
parentTemp string
@@ -237,8 +237,17 @@ func (runner *ContainerRunner) LoadImage() (err error) {
runner.CrunchLog.Printf("Fetching Docker image from collection '%s'", runner.Container.ContainerImage)
+ tok, err := runner.ContainerToken()
+ if err != nil {
+ return fmt.Errorf("While getting container token (LoadImage): %v", err)
+ }
+ arvClient, kc, err := runner.MkArvClient(tok)
+ if err != nil {
+ return fmt.Errorf("While creating arv client (LoadImage): %v", err)
+ }
+
var collection arvados.Collection
- err = runner.ArvClient.Get("collections", runner.Container.ContainerImage, nil, &collection)
+ err = arvClient.Get("collections", runner.Container.ContainerImage, nil, &collection)
if err != nil {
return fmt.Errorf("While getting container image collection: %v", err)
}
@@ -259,7 +268,7 @@ func (runner *ContainerRunner) LoadImage() (err error) {
runner.CrunchLog.Print("Loading Docker image from keep")
var readCloser io.ReadCloser
- readCloser, err = runner.Kc.ManifestFileReader(manifest, img)
+ readCloser, err = kc.ManifestFileReader(manifest, img)
if err != nil {
return fmt.Errorf("While creating ManifestFileReader for container image: %v", err)
}
@@ -281,7 +290,7 @@ func (runner *ContainerRunner) LoadImage() (err error) {
runner.ContainerConfig.Image = imageID
- runner.Kc.ClearBlockCache()
+ kc.ClearBlockCache()
return nil
}
@@ -1679,7 +1688,7 @@ func (runner *ContainerRunner) fetchContainerRecord() error {
return fmt.Errorf("error getting container token: %v", err)
}
- containerClient, err := runner.MkArvClient(containerToken)
+ containerClient, _, err := runner.MkArvClient(containerToken)
if err != nil {
return fmt.Errorf("error creating container API client: %v", err)
}
@@ -1719,13 +1728,17 @@ func NewContainerRunner(client *arvados.Client, api IArvadosClient, kc IKeepClie
}
return ps, nil
}
- cr.MkArvClient = func(token string) (IArvadosClient, error) {
+ cr.MkArvClient = func(token string) (IArvadosClient, IKeepClient, error) {
cl, err := arvadosclient.MakeArvadosClient()
if err != nil {
- return nil, err
+ return nil, nil, err
}
cl.ApiToken = token
- return cl, nil
+ kc, err := keepclient.MakeKeepClient(cl)
+ if err != nil {
+ return nil, nil, err
+ }
+ return cl, kc, nil
}
var err error
cr.LogCollection, err = (&arvados.Collection{}).FileSystem(cr.client, cr.Kc)
commit 761f243b75fe576eadf0aa44e8b1924ca9846ed7
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Tue Oct 16 14:51:12 2018 -0400
14262: saltAuthToken returns copy of request object
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/lib/controller/federation.go b/lib/controller/federation.go
index 5c6f6bf7a..e5c56bd83 100644
--- a/lib/controller/federation.go
+++ b/lib/controller/federation.go
@@ -14,6 +14,7 @@ import (
"fmt"
"io"
"io/ioutil"
+ "log"
"net/http"
"net/url"
"regexp"
@@ -47,16 +48,27 @@ type collectionFederatedRequestHandler struct {
func (h *Handler) remoteClusterRequest(remoteID string, w http.ResponseWriter, req *http.Request, filter ResponseFilter) {
remote, ok := h.Cluster.RemoteClusters[remoteID]
if !ok {
- httpserver.Error(w, "no proxy available for cluster "+remoteID, http.StatusNotFound)
+ err := fmt.Errorf("no proxy available for cluster %v", remoteID)
+ if filter != nil {
+ _, err = filter(nil, err)
+ }
+ if err != nil {
+ httpserver.Error(w, err.Error(), http.StatusNotFound)
+ }
return
}
scheme := remote.Scheme
if scheme == "" {
scheme = "https"
}
- err := h.saltAuthToken(req, remoteID)
+ req, err := h.saltAuthToken(req, remoteID)
if err != nil {
- httpserver.Error(w, err.Error(), http.StatusBadRequest)
+ if filter != nil {
+ _, err = filter(nil, err)
+ }
+ if err != nil {
+ httpserver.Error(w, err.Error(), http.StatusBadRequest)
+ }
return
}
urlOut := &url.URL{
@@ -655,6 +667,10 @@ func (h *collectionFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req
sem := make(chan bool, h.handler.Cluster.RequestLimits.GetMultiClusterRequestConcurrency())
defer close(sem)
for remoteID := range h.handler.Cluster.RemoteClusters {
+ if remoteID == h.handler.Cluster.ClusterID {
+ // No need to query local cluster again
+ continue
+ }
// blocks until it can put a value into the
// channel (which has a max queue capacity)
sem <- true
@@ -728,28 +744,40 @@ func (h *Handler) validateAPItoken(req *http.Request, user *CurrentUser) error {
// Extract the auth token supplied in req, and replace it with a
// salted token for the remote cluster.
-func (h *Handler) saltAuthToken(req *http.Request, remote string) error {
+func (h *Handler) saltAuthToken(req *http.Request, remote string) (updatedReq *http.Request, err error) {
+ updatedReq = (&http.Request{
+ Method: req.Method,
+ URL: req.URL,
+ Header: req.Header,
+ Body: req.Body,
+ ContentLength: req.ContentLength,
+ Host: req.Host,
+ }).WithContext(req.Context())
+
creds := auth.NewCredentials()
- creds.LoadTokensFromHTTPRequest(req)
- if len(creds.Tokens) == 0 && req.Header.Get("Content-Type") == "application/x-www-form-encoded" {
+ creds.LoadTokensFromHTTPRequest(updatedReq)
+ if len(creds.Tokens) == 0 && updatedReq.Header.Get("Content-Type") == "application/x-www-form-encoded" {
// Override ParseForm's 10MiB limit by ensuring
// req.Body is a *http.maxBytesReader.
- req.Body = http.MaxBytesReader(nil, req.Body, 1<<28) // 256MiB. TODO: use MaxRequestSize from discovery doc or config.
- if err := creds.LoadTokensFromHTTPRequestBody(req); err != nil {
- return err
+ updatedReq.Body = http.MaxBytesReader(nil, updatedReq.Body, 1<<28) // 256MiB. TODO: use MaxRequestSize from discovery doc or config.
+ if err := creds.LoadTokensFromHTTPRequestBody(updatedReq); err != nil {
+ return nil, err
}
// Replace req.Body with a buffer that re-encodes the
// form without api_token, in case we end up
// forwarding the request.
- if req.PostForm != nil {
- req.PostForm.Del("api_token")
+ if updatedReq.PostForm != nil {
+ updatedReq.PostForm.Del("api_token")
}
- req.Body = ioutil.NopCloser(bytes.NewBufferString(req.PostForm.Encode()))
+ updatedReq.Body = ioutil.NopCloser(bytes.NewBufferString(updatedReq.PostForm.Encode()))
}
if len(creds.Tokens) == 0 {
- return nil
+ return updatedReq, nil
}
+
token, err := auth.SaltToken(creds.Tokens[0], remote)
+
+ log.Printf("Salting %q %q to get %q %q", creds.Tokens[0], remote, token, err)
if err == auth.ErrObsoleteToken {
// If the token exists in our own database, salt it
// for the remote. Otherwise, assume it was issued by
@@ -760,26 +788,41 @@ func (h *Handler) saltAuthToken(req *http.Request, remote string) error {
// Not ours; pass through unmodified.
token = currentUser.Authorization.APIToken
} else if err != nil {
- return err
+ return nil, err
} else {
// Found; make V2 version and salt it.
token, err = auth.SaltToken(currentUser.Authorization.TokenV2(), remote)
if err != nil {
- return err
+ return nil, err
}
}
} else if err != nil {
- return err
+ return nil, err
+ }
+ updatedReq.Header = http.Header{}
+ for k, v := range req.Header {
+ if k == "Authorization" {
+ updatedReq.Header[k] = []string{"Bearer " + token}
+ } else {
+ updatedReq.Header[k] = v
+ }
}
- req.Header.Set("Authorization", "Bearer "+token)
+
+ log.Printf("Salted %q %q to get %q", creds.Tokens[0], remote, token)
// Remove api_token=... from the the query string, in case we
// end up forwarding the request.
- if values, err := url.ParseQuery(req.URL.RawQuery); err != nil {
- return err
+ if values, err := url.ParseQuery(updatedReq.URL.RawQuery); err != nil {
+ return nil, err
} else if _, ok := values["api_token"]; ok {
delete(values, "api_token")
- req.URL.RawQuery = values.Encode()
+ updatedReq.URL = &url.URL{
+ Scheme: req.URL.Scheme,
+ Host: req.URL.Host,
+ Path: req.URL.Path,
+ RawPath: req.URL.RawPath,
+ RawQuery: values.Encode(),
+ }
}
- return nil
+ return updatedReq, nil
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list