[ARVADOS] updated: 1.2.0-108-g0b0ca98b0
Git user
git at public.curoverse.com
Thu Sep 27 16:48:19 EDT 2018
Summary of changes:
lib/controller/federation.go | 79 +++++++++++++++++++++++++++------------
lib/controller/federation_test.go | 34 +++--------------
2 files changed, 60 insertions(+), 53 deletions(-)
via 0b0ca98b0777b7a2043d28536c7addda4eacd71d (commit)
via 045ac0dabebd02d56a4a116cf02eab36c00a9007 (commit)
from e9a3b4476759582e49404a0a1f8e820ad5d97fcd (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 0b0ca98b0777b7a2043d28536c7addda4eacd71d
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Thu Sep 27 16:44:53 2018 -0400
13619: Support [uuid, =, ...], cleanups
Remove call to code that looks for cluster_id from json payload of
POST, the SDKs don't generate this (cluster_id goes in the URL query
string or form encoded).
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/lib/controller/federation.go b/lib/controller/federation.go
index 04e0e74e3..72a147ae7 100644
--- a/lib/controller/federation.go
+++ b/lib/controller/federation.go
@@ -155,12 +155,6 @@ func (c *multiClusterQueryResponseCollector) collectResponse(resp *http.Response
func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.ResponseWriter, req *http.Request,
params url.Values, clusterId *string) bool {
- if !(len(params["count"]) == 1 && (params["count"][0] == `none` ||
- params["count"][0] == `"none"`)) {
- // don't federate unless params has count=none
- return false
- }
-
var filters [][]interface{}
err := json.Unmarshal([]byte(params["filters"][0]), &filters)
if err != nil {
@@ -173,22 +167,43 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response
if len(filters) == 1 && len(filters[0]) == 3 {
f1 := filters[0]
lhs := f1[0].(string)
- op := f1[1].(string)
- rhs := f1[2].([]interface{})
- if lhs == "uuid" && op == "in" {
- for _, i := range rhs {
- u := i.(string)
- *clusterId = u[0:5]
- queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u)
+ if lhs == "uuid" {
+ op, ok := f1[1].(string)
+ if !ok {
+ return false
+ }
+
+ if op == "in" {
+ rhs, ok := f1[2].([]interface{})
+ if ok {
+ for _, i := range rhs {
+ u := i.(string)
+ *clusterId = u[0:5]
+ queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u)
+ }
+ }
+ } else if op == "=" {
+ u, ok := f1[2].(string)
+ if ok {
+ *clusterId = u[0:5]
+ queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u)
+ }
}
}
+
}
- if len(queryClusters) == 0 {
- // Didn't find any ["uuid", "in", ...] filters
+ if len(queryClusters) <= 1 {
+ // Didn't find ["uuid", "in", ...] filters for multiple clusters
return false
}
+ if !(len(params["count"]) == 1 && (params["count"][0] == `none` ||
+ params["count"][0] == `"none"`)) {
+ httpserver.Error(w, "Federated multi-object query must have count=none", http.StatusBadRequest)
+ return true
+ }
+
wg := sync.WaitGroup{}
// use channel as a semaphore to limit it to 4
@@ -285,18 +300,10 @@ func (h *genericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *h
clusterId = params["cluster_id"][0]
}
- // TODO: decide if this actually makes sense...
- if clusterId == "" && req.Method == "POST" && req.Header.Get("Content-Type") == "application/json" {
- var hasClusterId struct {
- ClusterID string `json:"cluster_id"`
- }
- if err = loadParamsFromJson(req, &hasClusterId); err != nil {
- httpserver.Error(w, err.Error(), http.StatusBadRequest)
- return
- }
- clusterId = hasClusterId.ClusterID
- }
-
+ // Handle the POST-as-GET special case (workaround for large
+ // GET requests that potentially exceed maximum URL length,
+ // like multi-object queries where the filter has 100s of
+ // items)
effectiveMethod := req.Method
if req.Method == "POST" && len(params["_method"]) == 1 {
effectiveMethod = params["_method"][0]
diff --git a/lib/controller/federation_test.go b/lib/controller/federation_test.go
index 255adfe92..f165aecff 100644
--- a/lib/controller/federation_test.go
+++ b/lib/controller/federation_test.go
@@ -538,31 +538,7 @@ func (s *FederationSuite) TestUpdateRemoteContainerRequest(c *check.C) {
c.Check(cr.Priority, check.Equals, 696)
}
-func (s *FederationSuite) TestCreateRemoteContainerRequest1(c *check.C) {
- defer s.localServiceReturns404(c).Close()
- req := httptest.NewRequest("POST", "/arvados/v1/container_requests",
- strings.NewReader(`{
- "cluster_id": "zzzzz",
- "container_request": {
- "name": "hello world",
- "state": "Uncommitted",
- "output_path": "/",
- "container_image": "123",
- "command": ["abc"]
- }
-}
-`))
- req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
- req.Header.Set("Content-type", "application/json")
- resp := s.testRequest(req)
- c.Check(resp.StatusCode, check.Equals, http.StatusOK)
- var cr arvados.ContainerRequest
- c.Check(json.NewDecoder(resp.Body).Decode(&cr), check.IsNil)
- c.Check(cr.Name, check.Equals, "hello world")
- c.Check(strings.HasPrefix(cr.UUID, "zzzzz-"), check.Equals, true)
-}
-
-func (s *FederationSuite) TestCreateRemoteContainerRequest2(c *check.C) {
+func (s *FederationSuite) TestCreateRemoteContainerRequest(c *check.C) {
defer s.localServiceReturns404(c).Close()
// pass cluster_id via query parameter, this allows arvados-controller
// to avoid parsing the body
commit 045ac0dabebd02d56a4a116cf02eab36c00a9007
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Thu Sep 27 14:52:22 2018 -0400
13619: Clean up, require count=none
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/lib/controller/federation.go b/lib/controller/federation.go
index caa84ca5f..04e0e74e3 100644
--- a/lib/controller/federation.go
+++ b/lib/controller/federation.go
@@ -73,6 +73,11 @@ func (h *Handler) remoteClusterRequest(remoteID string, w http.ResponseWriter, r
h.proxy.Do(w, req, urlOut, client, filter)
}
+// loadParamsFromForm expects a request with
+// application/x-www-form-urlencoded body. It parses the query, adds
+// the query parameters to "params", and replaces the request body
+// with a buffer holding the original body contents so it can be
+// re-read by downstream proxy steps.
func loadParamsFromForm(req *http.Request, params url.Values) error {
body, err := ioutil.ReadAll(req.Body)
if err != nil {
@@ -89,6 +94,10 @@ func loadParamsFromForm(req *http.Request, params url.Values) error {
return nil
}
+// loadParamsFromForm expects a request with application/json body.
+// It parses the body, populates "loadInto", and replaces the request
+// body with a buffer holding the original body contents so it can be
+// re-read by downstream proxy steps.
func loadParamsFromJson(req *http.Request, loadInto interface{}) error {
var cl int64
if req.ContentLength > 0 {
@@ -107,13 +116,15 @@ func loadParamsFromJson(req *http.Request, loadInto interface{}) error {
return nil
}
-type responseCollector struct {
+type multiClusterQueryResponseCollector struct {
mtx sync.Mutex
responses []interface{}
errors []error
+ kind string
}
-func (c *responseCollector) collectResponse(resp *http.Response, requestError error) (newResponse *http.Response, err error) {
+func (c *multiClusterQueryResponseCollector) collectResponse(resp *http.Response,
+ requestError error) (newResponse *http.Response, err error) {
if requestError != nil {
c.mtx.Lock()
defer c.mtx.Unlock()
@@ -132,6 +143,7 @@ func (c *responseCollector) collectResponse(resp *http.Response, requestError er
c.errors = append(c.errors, fmt.Errorf("error %v", loadInto["errors"]))
} else {
c.responses = append(c.responses, loadInto["items"].([]interface{})...)
+ c.kind = loadInto["kind"].(string)
}
} else {
c.errors = append(c.errors, err)
@@ -143,12 +155,20 @@ func (c *responseCollector) collectResponse(resp *http.Response, requestError er
func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.ResponseWriter, req *http.Request,
params url.Values, clusterId *string) bool {
+ if !(len(params["count"]) == 1 && (params["count"][0] == `none` ||
+ params["count"][0] == `"none"`)) {
+ // don't federate unless params has count=none
+ return false
+ }
+
var filters [][]interface{}
err := json.Unmarshal([]byte(params["filters"][0]), &filters)
if err != nil {
httpserver.Error(w, err.Error(), http.StatusBadRequest)
return true
}
+
+ // Split the list of uuids by prefix
queryClusters := make(map[string][]string)
if len(filters) == 1 && len(filters[0]) == 3 {
f1 := filters[0]
@@ -164,13 +184,12 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response
}
}
- if len(queryClusters) <= 1 {
+ if len(queryClusters) == 0 {
+ // Didn't find any ["uuid", "in", ...] filters
return false
}
wg := sync.WaitGroup{}
- //var errors []string
- //var errorCode int = 404
// use channel as a semaphore to limit it to 4
// parallel requests at a time
@@ -178,7 +197,7 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response
defer close(sem)
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
- rc := responseCollector{}
+ rc := multiClusterQueryResponseCollector{}
for k, v := range queryClusters {
// blocks until it can put a value into the
// channel (which has a max queue capacity)
@@ -195,6 +214,7 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response
remoteReq.URL = &url.URL{Path: req.URL.Path}
remoteParams := make(url.Values)
remoteParams["_method"] = []string{"GET"}
+ remoteParams["count"] = []string{"none"}
content, err := json.Marshal(v)
if err != nil {
rc.mtx.Lock()
@@ -229,6 +249,7 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response
w.WriteHeader(http.StatusOK)
itemList := make(map[string]interface{})
itemList["items"] = rc.responses
+ itemList["kind"] = rc.kind
json.NewEncoder(w).Encode(itemList)
}
@@ -243,6 +264,7 @@ func (h *genericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *h
clusterId = m[2]
}
+ // First, parse the query portion of the URL.
var params url.Values
var err error
if params, err = url.ParseQuery(req.URL.RawQuery); err != nil {
@@ -250,6 +272,7 @@ func (h *genericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *h
return
}
+ // Next, if appropriate, merge in parameters from the form POST body.
if req.Method == "POST" && req.Header.Get("Content-Type") == "application/x-www-form-urlencoded" {
if err = loadParamsFromForm(req, params); err != nil {
httpserver.Error(w, err.Error(), http.StatusBadRequest)
@@ -257,10 +280,12 @@ func (h *genericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *h
}
}
+ // Check if the parameters have an explicit cluster_id
if len(params["cluster_id"]) == 1 {
clusterId = params["cluster_id"][0]
}
+ // TODO: decide if this actually makes sense...
if clusterId == "" && req.Method == "POST" && req.Header.Get("Content-Type") == "application/json" {
var hasClusterId struct {
ClusterID string `json:"cluster_id"`
@@ -282,7 +307,6 @@ func (h *genericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *h
return
}
}
- //log.Printf("Clusterid is %q", clusterId)
if clusterId == "" || clusterId == h.handler.Cluster.ClusterID {
h.next.ServeHTTP(w, req)
diff --git a/lib/controller/federation_test.go b/lib/controller/federation_test.go
index 113fa9eeb..255adfe92 100644
--- a/lib/controller/federation_test.go
+++ b/lib/controller/federation_test.go
@@ -621,7 +621,7 @@ func (s *FederationSuite) TestGetRemoteContainer(c *check.C) {
func (s *FederationSuite) TestListRemoteContainer(c *check.C) {
defer s.localServiceReturns404(c).Close()
- req := httptest.NewRequest("GET", "/arvados/v1/containers?filters="+
+ req := httptest.NewRequest("GET", "/arvados/v1/containers?count=none&filters="+
url.QueryEscape(fmt.Sprintf(`[["uuid", "in", ["%v"]]]`, arvadostest.QueuedContainerUUID)), nil)
req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
resp := s.testRequest(req)
@@ -634,11 +634,11 @@ func (s *FederationSuite) TestListRemoteContainer(c *check.C) {
func (s *FederationSuite) TestListMultiRemoteContainers(c *check.C) {
defer s.localServiceHandler(c, http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
bd, _ := ioutil.ReadAll(req.Body)
- c.Check(string(bd), check.Equals, `_method=GET&filters=%5B%5B%22uuid%22%2C+%22in%22%2C+%5B%22zhome-xvhdp-cr5queuedcontnr%22%5D%5D%5D`)
+ c.Check(string(bd), check.Equals, `_method=GET&count=none&filters=%5B%5B%22uuid%22%2C+%22in%22%2C+%5B%22zhome-xvhdp-cr5queuedcontnr%22%5D%5D%5D`)
w.WriteHeader(200)
- w.Write([]byte(`{"items": [{"uuid": "zhome-xvhdp-cr5queuedcontnr"}]}`))
+ w.Write([]byte(`{"kind": "arvados#containerList", "items": [{"uuid": "zhome-xvhdp-cr5queuedcontnr"}]}`))
})).Close()
- req := httptest.NewRequest("GET", "/arvados/v1/containers?filters="+
+ req := httptest.NewRequest("GET", "/arvados/v1/containers?count=none&filters="+
url.QueryEscape(fmt.Sprintf(`[["uuid", "in", ["%v", "zhome-xvhdp-cr5queuedcontnr"]]]`, arvadostest.QueuedContainerUUID)), nil)
req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
resp := s.testRequest(req)
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list