[ARVADOS] updated: 1.2.0-114-gcdaca3963

Git user git at public.curoverse.com
Mon Oct 1 15:49:31 EDT 2018


Summary of changes:
 doc/api/methods.html.textile.liquid |   8 +-
 lib/controller/federation.go        | 235 ++++++++++++++++++------------------
 lib/controller/federation_test.go   |   4 +-
 sdk/go/arvados/config.go            |  18 +--
 4 files changed, 131 insertions(+), 134 deletions(-)

       via  cdaca3963cf2d07e81eb71ac33e4c0966dec9b93 (commit)
      from  8ba7223586eabdbc2bcc2cf8cc143ce624286e50 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit cdaca3963cf2d07e81eb71ac33e4c0966dec9b93
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date:   Mon Oct 1 15:48:51 2018 -0400

    13619: Code cleanups
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>

diff --git a/doc/api/methods.html.textile.liquid b/doc/api/methods.html.textile.liquid
index 6f3426523..067018f9d 100644
--- a/doc/api/methods.html.textile.liquid
+++ b/doc/api/methods.html.textile.liquid
@@ -125,13 +125,13 @@ h4. Federated listing
 
 Federated listing forwards a request to multiple clusters and combines the results.  Currently only a very restricted form of the "list" method is supported.
 
-To query multiple clusters, the list method must:
+To query multiple clusters, the list request must:
 
 * Have filters only matching @[["uuid", "in", [...]]@ or @["uuid", "=", "..."]@
-* Must specify @count=none@
+* Specify @count=none@
 * If @select@ is specified, it must include @uuid@
-* Must not specify @limit@, @offset@ or @order@
-* Must not request more items than the maximum response size
+* Not specify @limit@, @offset@ or @order@
+* Not request more items than the maximum response size
 
 This form may be used to request a specific list of objects by uuid which are owned by multiple clusters.
 
diff --git a/lib/controller/federation.go b/lib/controller/federation.go
index 1d4844486..51b3107ea 100644
--- a/lib/controller/federation.go
+++ b/lib/controller/federation.go
@@ -73,51 +73,35 @@ func (h *Handler) remoteClusterRequest(remoteID string, w http.ResponseWriter, r
 	h.proxy.Do(w, req, urlOut, client, filter)
 }
 
-// loadParamsFromForm expects a request with
-// application/x-www-form-urlencoded body.  It parses the query, adds
-// the query parameters to "params", and replaces the request body
-// with a buffer holding the original body contents so it can be
-// re-read by downstream proxy steps.
-func loadParamsFromForm(req *http.Request, params url.Values) error {
-	body, err := ioutil.ReadAll(req.Body)
-	if err != nil {
-		return err
-	}
-	req.Body = ioutil.NopCloser(bytes.NewBuffer(body))
-	var v2 url.Values
-	if v2, err = url.ParseQuery(string(body)); err != nil {
-		return err
-	}
-	for k, v := range v2 {
-		params[k] = append(params[k], v...)
-	}
-	return nil
-}
-
-// loadParamsFromForm expects a request with application/json body.
-// It parses the body, populates "loadInto", and replaces the request
-// body with a buffer holding the original body contents so it can be
-// re-read by downstream proxy steps.
-func loadParamsFromJson(req *http.Request, loadInto interface{}) error {
-	var cl int64
-	if req.ContentLength > 0 {
-		cl = req.ContentLength
+// Buffer request body, parse form parameters in request, and then
+// replace original body with the buffer so it can be re-read by
+// downstream proxy steps.
+func loadParamsFromForm(req *http.Request) error {
+	var postBody *bytes.Buffer
+	if req.Body != nil && req.Header.Get("Content-Type") == "application/x-www-form-urlencoded" {
+		var cl int64
+		if req.ContentLength > 0 {
+			cl = req.ContentLength
+		}
+		postBody = bytes.NewBuffer(make([]byte, 0, cl))
+		originalBody := req.Body
+		defer originalBody.Close()
+		req.Body = ioutil.NopCloser(io.TeeReader(req.Body, postBody))
 	}
-	postBody := bytes.NewBuffer(make([]byte, 0, cl))
-	defer req.Body.Close()
-
-	rdr := io.TeeReader(req.Body, postBody)
 
-	err := json.NewDecoder(rdr).Decode(loadInto)
+	err := req.ParseForm()
 	if err != nil {
 		return err
 	}
-	req.Body = ioutil.NopCloser(postBody)
+
+	if req.Body != nil && postBody != nil {
+		req.Body = ioutil.NopCloser(postBody)
+	}
 	return nil
 }
 
 type multiClusterQueryResponseCollector struct {
-	responses []interface{}
+	responses []map[string]interface{}
 	error     error
 	kind      string
 	clusterID string
@@ -131,38 +115,48 @@ func (c *multiClusterQueryResponseCollector) collectResponse(resp *http.Response
 	}
 
 	defer resp.Body.Close()
-	loadInto := make(map[string]interface{})
+	var loadInto struct {
+		Kind   string                   `json:"kind"`
+		Items  []map[string]interface{} `json:"items"`
+		Errors []string                 `json:"errors"`
+	}
 	err = json.NewDecoder(resp.Body).Decode(&loadInto)
 
-	if err == nil {
-		if resp.StatusCode != http.StatusOK {
-			c.error = fmt.Errorf("error fetching from %v (%v): %v", c.clusterID, resp.Status, loadInto["errors"])
-		} else {
-			c.responses = loadInto["items"].([]interface{})
-			c.kind, _ = loadInto["kind"].(string)
-		}
-	} else {
+	if err != nil {
 		c.error = fmt.Errorf("error fetching from %v (%v): %v", c.clusterID, resp.Status, err)
+		return nil, nil
+	}
+	if resp.StatusCode != http.StatusOK {
+		c.error = fmt.Errorf("error fetching from %v (%v): %v", c.clusterID, resp.Status, loadInto.Errors)
+		return nil, nil
 	}
 
+	c.responses = loadInto.Items
+	c.kind = loadInto.Kind
+
 	return nil, nil
 }
 
 func (h *genericFederatedRequestHandler) remoteQueryUUIDs(w http.ResponseWriter,
-	req *http.Request, params url.Values,
-	clusterID string, uuids []string) (rp []interface{}, kind string, err error) {
+	req *http.Request,
+	clusterID string, uuids []string) (rp []map[string]interface{}, kind string, err error) {
 
 	found := make(map[string]bool)
-	for len(uuids) > 0 {
+	prev_len_uuids := len(uuids) + 1
+	// Loop while
+	// (1) there are more uuids to query
+	// (2) we're making progress - on each iteration the set of
+	// uuids we are expecting for must shrink.
+	for len(uuids) > 0 && len(uuids) < prev_len_uuids {
 		var remoteReq http.Request
 		remoteReq.Header = req.Header
 		remoteReq.Method = "POST"
 		remoteReq.URL = &url.URL{Path: req.URL.Path}
 		remoteParams := make(url.Values)
-		remoteParams["_method"] = []string{"GET"}
-		remoteParams["count"] = []string{"none"}
-		if len(params["select"]) != 0 {
-			remoteParams["select"] = params["select"]
+		remoteParams.Set("_method", "GET")
+		remoteParams.Set("count", "none")
+		if req.Form.Get("select") != "" {
+			remoteParams.Set("select", req.Form.Get("select"))
 		}
 		content, err := json.Marshal(uuids)
 		if err != nil {
@@ -200,12 +194,9 @@ func (h *genericFederatedRequestHandler) remoteQueryUUIDs(w http.ResponseWriter,
 		// around and do another request with just the
 		// stragglers.
 		for _, i := range rc.responses {
-			m, ok := i.(map[string]interface{})
+			uuid, ok := i["uuid"].(string)
 			if ok {
-				uuid, ok := m["uuid"].(string)
-				if ok {
-					found[uuid] = true
-				}
+				found[uuid] = true
 			}
 		}
 
@@ -215,17 +206,32 @@ func (h *genericFederatedRequestHandler) remoteQueryUUIDs(w http.ResponseWriter,
 				l = append(l, u)
 			}
 		}
+		prev_len_uuids = len(uuids)
 		uuids = l
 	}
 
 	return rp, kind, nil
 }
 
-func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.ResponseWriter, req *http.Request,
-	params url.Values, clusterId *string) bool {
+func (h *Handler) FederatedRequestConcurrency() int {
+	if h.Cluster.FederatedRequestConcurrency == 0 {
+		return 4
+	}
+	return h.Cluster.FederatedRequestConcurrency
+}
+
+func (h *Handler) MaxItemsPerResponse() int {
+	if h.Cluster.MaxItemsPerResponse == 0 {
+		return 1000
+	}
+	return h.Cluster.MaxItemsPerResponse
+}
+
+func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.ResponseWriter,
+	req *http.Request, clusterId *string) bool {
 
 	var filters [][]interface{}
-	err := json.Unmarshal([]byte(params["filters"][0]), &filters)
+	err := json.Unmarshal([]byte(req.Form.Get("filters")), &filters)
 	if err != nil {
 		httpserver.Error(w, err.Error(), http.StatusBadRequest)
 		return true
@@ -234,40 +240,40 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response
 	// Split the list of uuids by prefix
 	queryClusters := make(map[string][]string)
 	expectCount := 0
-	for _, f1 := range filters {
-		if len(f1) != 3 {
+	for _, filter := range filters {
+		if len(filter) != 3 {
+			return false
+		}
+
+		if lhs, ok := filter[0].(string); !ok || lhs != "uuid" {
+			return false
+		}
+
+		op, ok := filter[1].(string)
+		if !ok {
 			return false
 		}
-		lhs, ok := f1[0].(string)
-		if ok && lhs == "uuid" {
-			op, ok := f1[1].(string)
-			if !ok {
-				return false
-			}
 
-			if op == "in" {
-				rhs, ok := f1[2].([]interface{})
-				if ok {
-					for _, i := range rhs {
-						u := i.(string)
+		if op == "in" {
+			if rhs, ok := filter[2].([]interface{}); ok {
+				for _, i := range rhs {
+					if u, ok := i.(string); ok {
 						*clusterId = u[0:5]
 						queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u)
+						expectCount += 1
 					}
-					expectCount += len(rhs)
 				}
-			} else if op == "=" {
-				u, ok := f1[2].(string)
-				if ok {
-					*clusterId = u[0:5]
-					queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u)
-					expectCount += 1
-				}
-			} else {
-				return false
+			}
+		} else if op == "=" {
+			if u, ok := filter[2].(string); ok {
+				*clusterId = u[0:5]
+				queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u)
+				expectCount += 1
 			}
 		} else {
 			return false
 		}
+
 	}
 
 	if len(queryClusters) <= 1 {
@@ -277,31 +283,31 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response
 	}
 
 	// Validations
-	if !(len(params["count"]) == 1 && (params["count"][0] == `none` ||
-		params["count"][0] == `"none"`)) {
+	count := req.Form.Get("count")
+	if count != "" && count != `none` && count != `"none"` {
 		httpserver.Error(w, "Federated multi-object query must have 'count=none'", http.StatusBadRequest)
 		return true
 	}
-	if len(params["limit"]) != 0 || len(params["offset"]) != 0 || len(params["order"]) != 0 {
+	if req.Form.Get("limit") != "" || req.Form.Get("offset") != "" || req.Form.Get("order") != "" {
 		httpserver.Error(w, "Federated multi-object may not provide 'limit', 'offset' or 'order'.", http.StatusBadRequest)
 		return true
 	}
-	if expectCount > h.handler.Cluster.MaxItemsPerResponse {
+	if expectCount > h.handler.MaxItemsPerResponse() {
 		httpserver.Error(w, fmt.Sprintf("Federated multi-object request for %v objects which is more than max page size %v.",
-			expectCount, h.handler.Cluster.MaxItemsPerResponse), http.StatusBadRequest)
+			expectCount, h.handler.MaxItemsPerResponse()), http.StatusBadRequest)
 		return true
 	}
-	if len(params["select"]) == 1 {
+	if req.Form.Get("select") != "" {
 		foundUUID := false
-		var selects []interface{}
-		err := json.Unmarshal([]byte(params["select"][0]), &selects)
+		var selects []string
+		err := json.Unmarshal([]byte(req.Form.Get("select")), &selects)
 		if err != nil {
 			httpserver.Error(w, err.Error(), http.StatusBadRequest)
 			return true
 		}
 
 		for _, r := range selects {
-			if r.(string) == "uuid" {
+			if r == "uuid" {
 				foundUUID = true
 				break
 			}
@@ -312,18 +318,18 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response
 		}
 	}
 
-	// Perform parallel requests to each cluster
+	// Perform concurrent requests to each cluster
 
-	// use channel as a semaphore to limit the number of parallel
+	// use channel as a semaphore to limit the number of concurrent
 	// requests at a time
-	sem := make(chan bool, h.handler.Cluster.ParallelRemoteRequests)
+	sem := make(chan bool, h.handler.FederatedRequestConcurrency())
 	defer close(sem)
 	wg := sync.WaitGroup{}
 
 	req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
 	mtx := sync.Mutex{}
 	errors := []error{}
-	var completeResponses []interface{}
+	var completeResponses []map[string]interface{}
 	var kind string
 
 	for k, v := range queryClusters {
@@ -337,7 +343,7 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response
 		sem <- true
 		wg.Add(1)
 		go func(k string, v []string) {
-			rp, kn, err := h.remoteQueryUUIDs(w, req, params, k, v)
+			rp, kn, err := h.remoteQueryUUIDs(w, req, k, v)
 			mtx.Lock()
 			if err == nil {
 				completeResponses = append(completeResponses, rp...)
@@ -379,25 +385,15 @@ func (h *genericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *h
 		clusterId = m[2]
 	}
 
-	// First, parse the query portion of the URL.
-	var params url.Values
-	var err error
-	if params, err = url.ParseQuery(req.URL.RawQuery); err != nil {
+	// Get form parameters from URL and form body (if POST).
+	if err := loadParamsFromForm(req); err != nil {
 		httpserver.Error(w, err.Error(), http.StatusBadRequest)
 		return
 	}
 
-	// Next, if appropriate, merge in parameters from the form POST body.
-	if req.Method == "POST" && req.Header.Get("Content-Type") == "application/x-www-form-urlencoded" {
-		if err = loadParamsFromForm(req, params); err != nil {
-			httpserver.Error(w, err.Error(), http.StatusBadRequest)
-			return
-		}
-	}
-
 	// Check if the parameters have an explicit cluster_id
-	if len(params["cluster_id"]) == 1 {
-		clusterId = params["cluster_id"][0]
+	if req.Form.Get("cluster_id") != "" {
+		clusterId = req.Form.Get("cluster_id")
 	}
 
 	// Handle the POST-as-GET special case (workaround for large
@@ -405,14 +401,15 @@ func (h *genericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *h
 	// like multi-object queries where the filter has 100s of
 	// items)
 	effectiveMethod := req.Method
-	if req.Method == "POST" && len(params["_method"]) == 1 {
-		effectiveMethod = params["_method"][0]
+	if req.Method == "POST" && req.Form.Get("_method") != "" {
+		effectiveMethod = req.Form.Get("_method")
 	}
 
-	if effectiveMethod == "GET" && clusterId == "" && len(params["filters"]) == 1 {
-		if h.handleMultiClusterQuery(w, req, params, &clusterId) {
-			return
-		}
+	if effectiveMethod == "GET" &&
+		clusterId == "" &&
+		req.Form.Get("filters") != "" &&
+		h.handleMultiClusterQuery(w, req, &clusterId) {
+		return
 	}
 
 	if clusterId == "" || clusterId == h.handler.Cluster.ClusterID {
@@ -667,9 +664,9 @@ func (h *collectionFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req
 	var errors []string
 	var errorCode int = 404
 
-	// use channel as a semaphore to limit the number of parallel
+	// use channel as a semaphore to limit the number of concurrent
 	// requests at a time
-	sem := make(chan bool, h.handler.Cluster.ParallelRemoteRequests)
+	sem := make(chan bool, h.handler.FederatedRequestConcurrency())
 	defer close(sem)
 	for remoteID := range h.handler.Cluster.RemoteClusters {
 		// blocks until it can put a value into the
diff --git a/lib/controller/federation_test.go b/lib/controller/federation_test.go
index 0b62ce5ff..1b099be5e 100644
--- a/lib/controller/federation_test.go
+++ b/lib/controller/federation_test.go
@@ -63,8 +63,8 @@ func (s *FederationSuite) SetUpTest(c *check.C) {
 		NodeProfiles: map[string]arvados.NodeProfile{
 			"*": nodeProfile,
 		},
-		MaxItemsPerResponse:    1000,
-		ParallelRemoteRequests: 4,
+		MaxItemsPerResponse:         1000,
+		FederatedRequestConcurrency: 4,
 	}, NodeProfile: &nodeProfile}
 	s.testServer = newServerFromIntegrationTestEnv(c)
 	s.testServer.Server.Handler = httpserver.AddRequestIDs(httpserver.LogRequests(s.log, s.testHandler))
diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go
index f309ac7bd..7101693f3 100644
--- a/sdk/go/arvados/config.go
+++ b/sdk/go/arvados/config.go
@@ -51,15 +51,15 @@ func (sc *Config) GetCluster(clusterID string) (*Cluster, error) {
 }
 
 type Cluster struct {
-	ClusterID              string `json:"-"`
-	ManagementToken        string
-	NodeProfiles           map[string]NodeProfile
-	InstanceTypes          InstanceTypeMap
-	HTTPRequestTimeout     Duration
-	RemoteClusters         map[string]RemoteCluster
-	PostgreSQL             PostgreSQL
-	MaxItemsPerResponse    int
-	ParallelRemoteRequests int
+	ClusterID                   string `json:"-"`
+	ManagementToken             string
+	NodeProfiles                map[string]NodeProfile
+	InstanceTypes               InstanceTypeMap
+	HTTPRequestTimeout          Duration
+	RemoteClusters              map[string]RemoteCluster
+	PostgreSQL                  PostgreSQL
+	MaxItemsPerResponse         int
+	FederatedRequestConcurrency int
 }
 
 type PostgreSQL struct {

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list