[ARVADOS] updated: 1.2.0-108-g0b0ca98b0

Git user git at public.curoverse.com
Thu Sep 27 16:48:19 EDT 2018


Summary of changes:
 lib/controller/federation.go      | 79 +++++++++++++++++++++++++++------------
 lib/controller/federation_test.go | 34 +++--------------
 2 files changed, 60 insertions(+), 53 deletions(-)

       via  0b0ca98b0777b7a2043d28536c7addda4eacd71d (commit)
       via  045ac0dabebd02d56a4a116cf02eab36c00a9007 (commit)
      from  e9a3b4476759582e49404a0a1f8e820ad5d97fcd (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 0b0ca98b0777b7a2043d28536c7addda4eacd71d
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date:   Thu Sep 27 16:44:53 2018 -0400

    13619: Support [uuid, =, ...], cleanups
    
    Remove call to code that looks for cluster_id from json payload of
    POST, the SDKs don't generate this (cluster_id goes in the URL query
    string or form encoded).
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>

diff --git a/lib/controller/federation.go b/lib/controller/federation.go
index 04e0e74e3..72a147ae7 100644
--- a/lib/controller/federation.go
+++ b/lib/controller/federation.go
@@ -155,12 +155,6 @@ func (c *multiClusterQueryResponseCollector) collectResponse(resp *http.Response
 func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.ResponseWriter, req *http.Request,
 	params url.Values, clusterId *string) bool {
 
-	if !(len(params["count"]) == 1 && (params["count"][0] == `none` ||
-		params["count"][0] == `"none"`)) {
-		// don't federate unless params has count=none
-		return false
-	}
-
 	var filters [][]interface{}
 	err := json.Unmarshal([]byte(params["filters"][0]), &filters)
 	if err != nil {
@@ -173,22 +167,43 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response
 	if len(filters) == 1 && len(filters[0]) == 3 {
 		f1 := filters[0]
 		lhs := f1[0].(string)
-		op := f1[1].(string)
-		rhs := f1[2].([]interface{})
-		if lhs == "uuid" && op == "in" {
-			for _, i := range rhs {
-				u := i.(string)
-				*clusterId = u[0:5]
-				queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u)
+		if lhs == "uuid" {
+			op, ok := f1[1].(string)
+			if !ok {
+				return false
+			}
+
+			if op == "in" {
+				rhs, ok := f1[2].([]interface{})
+				if ok {
+					for _, i := range rhs {
+						u := i.(string)
+						*clusterId = u[0:5]
+						queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u)
+					}
+				}
+			} else if op == "=" {
+				u, ok := f1[2].(string)
+				if ok {
+					*clusterId = u[0:5]
+					queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u)
+				}
 			}
 		}
+
 	}
 
-	if len(queryClusters) == 0 {
-		// Didn't find any ["uuid", "in", ...] filters
+	if len(queryClusters) <= 1 {
+		// Didn't find ["uuid", "in", ...] filters for multiple clusters
 		return false
 	}
 
+	if !(len(params["count"]) == 1 && (params["count"][0] == `none` ||
+		params["count"][0] == `"none"`)) {
+		httpserver.Error(w, "Federated multi-object query must have count=none", http.StatusBadRequest)
+		return true
+	}
+
 	wg := sync.WaitGroup{}
 
 	// use channel as a semaphore to limit it to 4
@@ -285,18 +300,10 @@ func (h *genericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *h
 		clusterId = params["cluster_id"][0]
 	}
 
-	// TODO: decide if this actually makes sense...
-	if clusterId == "" && req.Method == "POST" && req.Header.Get("Content-Type") == "application/json" {
-		var hasClusterId struct {
-			ClusterID string `json:"cluster_id"`
-		}
-		if err = loadParamsFromJson(req, &hasClusterId); err != nil {
-			httpserver.Error(w, err.Error(), http.StatusBadRequest)
-			return
-		}
-		clusterId = hasClusterId.ClusterID
-	}
-
+	// Handle the POST-as-GET special case (workaround for large
+	// GET requests that potentially exceed maximum URL length,
+	// like multi-object queries where the filter has 100s of
+	// items)
 	effectiveMethod := req.Method
 	if req.Method == "POST" && len(params["_method"]) == 1 {
 		effectiveMethod = params["_method"][0]
diff --git a/lib/controller/federation_test.go b/lib/controller/federation_test.go
index 255adfe92..f165aecff 100644
--- a/lib/controller/federation_test.go
+++ b/lib/controller/federation_test.go
@@ -538,31 +538,7 @@ func (s *FederationSuite) TestUpdateRemoteContainerRequest(c *check.C) {
 	c.Check(cr.Priority, check.Equals, 696)
 }
 
-func (s *FederationSuite) TestCreateRemoteContainerRequest1(c *check.C) {
-	defer s.localServiceReturns404(c).Close()
-	req := httptest.NewRequest("POST", "/arvados/v1/container_requests",
-		strings.NewReader(`{
-  "cluster_id": "zzzzz",
-  "container_request": {
-    "name": "hello world",
-    "state": "Uncommitted",
-    "output_path": "/",
-    "container_image": "123",
-    "command": ["abc"]
-  }
-}
-`))
-	req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
-	req.Header.Set("Content-type", "application/json")
-	resp := s.testRequest(req)
-	c.Check(resp.StatusCode, check.Equals, http.StatusOK)
-	var cr arvados.ContainerRequest
-	c.Check(json.NewDecoder(resp.Body).Decode(&cr), check.IsNil)
-	c.Check(cr.Name, check.Equals, "hello world")
-	c.Check(strings.HasPrefix(cr.UUID, "zzzzz-"), check.Equals, true)
-}
-
-func (s *FederationSuite) TestCreateRemoteContainerRequest2(c *check.C) {
+func (s *FederationSuite) TestCreateRemoteContainerRequest(c *check.C) {
 	defer s.localServiceReturns404(c).Close()
 	// pass cluster_id via query parameter, this allows arvados-controller
 	// to avoid parsing the body

commit 045ac0dabebd02d56a4a116cf02eab36c00a9007
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date:   Thu Sep 27 14:52:22 2018 -0400

    13619: Clean up, require count=none
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>

diff --git a/lib/controller/federation.go b/lib/controller/federation.go
index caa84ca5f..04e0e74e3 100644
--- a/lib/controller/federation.go
+++ b/lib/controller/federation.go
@@ -73,6 +73,11 @@ func (h *Handler) remoteClusterRequest(remoteID string, w http.ResponseWriter, r
 	h.proxy.Do(w, req, urlOut, client, filter)
 }
 
+// loadParamsFromForm expects a request with
+// application/x-www-form-urlencoded body.  It parses the query, adds
+// the query parameters to "params", and replaces the request body
+// with a buffer holding the original body contents so it can be
+// re-read by downstream proxy steps.
 func loadParamsFromForm(req *http.Request, params url.Values) error {
 	body, err := ioutil.ReadAll(req.Body)
 	if err != nil {
@@ -89,6 +94,10 @@ func loadParamsFromForm(req *http.Request, params url.Values) error {
 	return nil
 }
 
+// loadParamsFromForm expects a request with application/json body.
+// It parses the body, populates "loadInto", and replaces the request
+// body with a buffer holding the original body contents so it can be
+// re-read by downstream proxy steps.
 func loadParamsFromJson(req *http.Request, loadInto interface{}) error {
 	var cl int64
 	if req.ContentLength > 0 {
@@ -107,13 +116,15 @@ func loadParamsFromJson(req *http.Request, loadInto interface{}) error {
 	return nil
 }
 
-type responseCollector struct {
+type multiClusterQueryResponseCollector struct {
 	mtx       sync.Mutex
 	responses []interface{}
 	errors    []error
+	kind      string
 }
 
-func (c *responseCollector) collectResponse(resp *http.Response, requestError error) (newResponse *http.Response, err error) {
+func (c *multiClusterQueryResponseCollector) collectResponse(resp *http.Response,
+	requestError error) (newResponse *http.Response, err error) {
 	if requestError != nil {
 		c.mtx.Lock()
 		defer c.mtx.Unlock()
@@ -132,6 +143,7 @@ func (c *responseCollector) collectResponse(resp *http.Response, requestError er
 			c.errors = append(c.errors, fmt.Errorf("error %v", loadInto["errors"]))
 		} else {
 			c.responses = append(c.responses, loadInto["items"].([]interface{})...)
+			c.kind = loadInto["kind"].(string)
 		}
 	} else {
 		c.errors = append(c.errors, err)
@@ -143,12 +155,20 @@ func (c *responseCollector) collectResponse(resp *http.Response, requestError er
 func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.ResponseWriter, req *http.Request,
 	params url.Values, clusterId *string) bool {
 
+	if !(len(params["count"]) == 1 && (params["count"][0] == `none` ||
+		params["count"][0] == `"none"`)) {
+		// don't federate unless params has count=none
+		return false
+	}
+
 	var filters [][]interface{}
 	err := json.Unmarshal([]byte(params["filters"][0]), &filters)
 	if err != nil {
 		httpserver.Error(w, err.Error(), http.StatusBadRequest)
 		return true
 	}
+
+	// Split the list of uuids by prefix
 	queryClusters := make(map[string][]string)
 	if len(filters) == 1 && len(filters[0]) == 3 {
 		f1 := filters[0]
@@ -164,13 +184,12 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response
 		}
 	}
 
-	if len(queryClusters) <= 1 {
+	if len(queryClusters) == 0 {
+		// Didn't find any ["uuid", "in", ...] filters
 		return false
 	}
 
 	wg := sync.WaitGroup{}
-	//var errors []string
-	//var errorCode int = 404
 
 	// use channel as a semaphore to limit it to 4
 	// parallel requests at a time
@@ -178,7 +197,7 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response
 	defer close(sem)
 	req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
 
-	rc := responseCollector{}
+	rc := multiClusterQueryResponseCollector{}
 	for k, v := range queryClusters {
 		// blocks until it can put a value into the
 		// channel (which has a max queue capacity)
@@ -195,6 +214,7 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response
 			remoteReq.URL = &url.URL{Path: req.URL.Path}
 			remoteParams := make(url.Values)
 			remoteParams["_method"] = []string{"GET"}
+			remoteParams["count"] = []string{"none"}
 			content, err := json.Marshal(v)
 			if err != nil {
 				rc.mtx.Lock()
@@ -229,6 +249,7 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response
 		w.WriteHeader(http.StatusOK)
 		itemList := make(map[string]interface{})
 		itemList["items"] = rc.responses
+		itemList["kind"] = rc.kind
 		json.NewEncoder(w).Encode(itemList)
 	}
 
@@ -243,6 +264,7 @@ func (h *genericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *h
 		clusterId = m[2]
 	}
 
+	// First, parse the query portion of the URL.
 	var params url.Values
 	var err error
 	if params, err = url.ParseQuery(req.URL.RawQuery); err != nil {
@@ -250,6 +272,7 @@ func (h *genericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *h
 		return
 	}
 
+	// Next, if appropriate, merge in parameters from the form POST body.
 	if req.Method == "POST" && req.Header.Get("Content-Type") == "application/x-www-form-urlencoded" {
 		if err = loadParamsFromForm(req, params); err != nil {
 			httpserver.Error(w, err.Error(), http.StatusBadRequest)
@@ -257,10 +280,12 @@ func (h *genericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *h
 		}
 	}
 
+	// Check if the parameters have an explicit cluster_id
 	if len(params["cluster_id"]) == 1 {
 		clusterId = params["cluster_id"][0]
 	}
 
+	// TODO: decide if this actually makes sense...
 	if clusterId == "" && req.Method == "POST" && req.Header.Get("Content-Type") == "application/json" {
 		var hasClusterId struct {
 			ClusterID string `json:"cluster_id"`
@@ -282,7 +307,6 @@ func (h *genericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *h
 			return
 		}
 	}
-	//log.Printf("Clusterid is %q", clusterId)
 
 	if clusterId == "" || clusterId == h.handler.Cluster.ClusterID {
 		h.next.ServeHTTP(w, req)
diff --git a/lib/controller/federation_test.go b/lib/controller/federation_test.go
index 113fa9eeb..255adfe92 100644
--- a/lib/controller/federation_test.go
+++ b/lib/controller/federation_test.go
@@ -621,7 +621,7 @@ func (s *FederationSuite) TestGetRemoteContainer(c *check.C) {
 
 func (s *FederationSuite) TestListRemoteContainer(c *check.C) {
 	defer s.localServiceReturns404(c).Close()
-	req := httptest.NewRequest("GET", "/arvados/v1/containers?filters="+
+	req := httptest.NewRequest("GET", "/arvados/v1/containers?count=none&filters="+
 		url.QueryEscape(fmt.Sprintf(`[["uuid", "in", ["%v"]]]`, arvadostest.QueuedContainerUUID)), nil)
 	req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
 	resp := s.testRequest(req)
@@ -634,11 +634,11 @@ func (s *FederationSuite) TestListRemoteContainer(c *check.C) {
 func (s *FederationSuite) TestListMultiRemoteContainers(c *check.C) {
 	defer s.localServiceHandler(c, http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
 		bd, _ := ioutil.ReadAll(req.Body)
-		c.Check(string(bd), check.Equals, `_method=GET&filters=%5B%5B%22uuid%22%2C+%22in%22%2C+%5B%22zhome-xvhdp-cr5queuedcontnr%22%5D%5D%5D`)
+		c.Check(string(bd), check.Equals, `_method=GET&count=none&filters=%5B%5B%22uuid%22%2C+%22in%22%2C+%5B%22zhome-xvhdp-cr5queuedcontnr%22%5D%5D%5D`)
 		w.WriteHeader(200)
-		w.Write([]byte(`{"items": [{"uuid": "zhome-xvhdp-cr5queuedcontnr"}]}`))
+		w.Write([]byte(`{"kind": "arvados#containerList", "items": [{"uuid": "zhome-xvhdp-cr5queuedcontnr"}]}`))
 	})).Close()
-	req := httptest.NewRequest("GET", "/arvados/v1/containers?filters="+
+	req := httptest.NewRequest("GET", "/arvados/v1/containers?count=none&filters="+
 		url.QueryEscape(fmt.Sprintf(`[["uuid", "in", ["%v", "zhome-xvhdp-cr5queuedcontnr"]]]`, arvadostest.QueuedContainerUUID)), nil)
 	req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
 	resp := s.testRequest(req)

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list