[ARVADOS] updated: 1.2.0-114-gcdaca3963
Git user
git at public.curoverse.com
Mon Oct 1 15:49:31 EDT 2018
Summary of changes:
doc/api/methods.html.textile.liquid | 8 +-
lib/controller/federation.go | 235 ++++++++++++++++++------------------
lib/controller/federation_test.go | 4 +-
sdk/go/arvados/config.go | 18 +--
4 files changed, 131 insertions(+), 134 deletions(-)
via cdaca3963cf2d07e81eb71ac33e4c0966dec9b93 (commit)
from 8ba7223586eabdbc2bcc2cf8cc143ce624286e50 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit cdaca3963cf2d07e81eb71ac33e4c0966dec9b93
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Mon Oct 1 15:48:51 2018 -0400
13619: Code cleanups
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/doc/api/methods.html.textile.liquid b/doc/api/methods.html.textile.liquid
index 6f3426523..067018f9d 100644
--- a/doc/api/methods.html.textile.liquid
+++ b/doc/api/methods.html.textile.liquid
@@ -125,13 +125,13 @@ h4. Federated listing
Federated listing forwards a request to multiple clusters and combines the results. Currently only a very restricted form of the "list" method is supported.
-To query multiple clusters, the list method must:
+To query multiple clusters, the list request must:
* Have filters only matching @[["uuid", "in", [...]]@ or @["uuid", "=", "..."]@
-* Must specify @count=none@
+* Specify @count=none@
* If @select@ is specified, it must include @uuid@
-* Must not specify @limit@, @offset@ or @order@
-* Must not request more items than the maximum response size
+* Not specify @limit@, @offset@ or @order@
+* Not request more items than the maximum response size
This form may be used to request a specific list of objects by uuid which are owned by multiple clusters.
diff --git a/lib/controller/federation.go b/lib/controller/federation.go
index 1d4844486..51b3107ea 100644
--- a/lib/controller/federation.go
+++ b/lib/controller/federation.go
@@ -73,51 +73,35 @@ func (h *Handler) remoteClusterRequest(remoteID string, w http.ResponseWriter, r
h.proxy.Do(w, req, urlOut, client, filter)
}
-// loadParamsFromForm expects a request with
-// application/x-www-form-urlencoded body. It parses the query, adds
-// the query parameters to "params", and replaces the request body
-// with a buffer holding the original body contents so it can be
-// re-read by downstream proxy steps.
-func loadParamsFromForm(req *http.Request, params url.Values) error {
- body, err := ioutil.ReadAll(req.Body)
- if err != nil {
- return err
- }
- req.Body = ioutil.NopCloser(bytes.NewBuffer(body))
- var v2 url.Values
- if v2, err = url.ParseQuery(string(body)); err != nil {
- return err
- }
- for k, v := range v2 {
- params[k] = append(params[k], v...)
- }
- return nil
-}
-
-// loadParamsFromForm expects a request with application/json body.
-// It parses the body, populates "loadInto", and replaces the request
-// body with a buffer holding the original body contents so it can be
-// re-read by downstream proxy steps.
-func loadParamsFromJson(req *http.Request, loadInto interface{}) error {
- var cl int64
- if req.ContentLength > 0 {
- cl = req.ContentLength
+// Buffer request body, parse form parameters in request, and then
+// replace original body with the buffer so it can be re-read by
+// downstream proxy steps.
+func loadParamsFromForm(req *http.Request) error {
+ var postBody *bytes.Buffer
+ if req.Body != nil && req.Header.Get("Content-Type") == "application/x-www-form-urlencoded" {
+ var cl int64
+ if req.ContentLength > 0 {
+ cl = req.ContentLength
+ }
+ postBody = bytes.NewBuffer(make([]byte, 0, cl))
+ originalBody := req.Body
+ defer originalBody.Close()
+ req.Body = ioutil.NopCloser(io.TeeReader(req.Body, postBody))
}
- postBody := bytes.NewBuffer(make([]byte, 0, cl))
- defer req.Body.Close()
-
- rdr := io.TeeReader(req.Body, postBody)
- err := json.NewDecoder(rdr).Decode(loadInto)
+ err := req.ParseForm()
if err != nil {
return err
}
- req.Body = ioutil.NopCloser(postBody)
+
+ if req.Body != nil && postBody != nil {
+ req.Body = ioutil.NopCloser(postBody)
+ }
return nil
}
type multiClusterQueryResponseCollector struct {
- responses []interface{}
+ responses []map[string]interface{}
error error
kind string
clusterID string
@@ -131,38 +115,48 @@ func (c *multiClusterQueryResponseCollector) collectResponse(resp *http.Response
}
defer resp.Body.Close()
- loadInto := make(map[string]interface{})
+ var loadInto struct {
+ Kind string `json:"kind"`
+ Items []map[string]interface{} `json:"items"`
+ Errors []string `json:"errors"`
+ }
err = json.NewDecoder(resp.Body).Decode(&loadInto)
- if err == nil {
- if resp.StatusCode != http.StatusOK {
- c.error = fmt.Errorf("error fetching from %v (%v): %v", c.clusterID, resp.Status, loadInto["errors"])
- } else {
- c.responses = loadInto["items"].([]interface{})
- c.kind, _ = loadInto["kind"].(string)
- }
- } else {
+ if err != nil {
c.error = fmt.Errorf("error fetching from %v (%v): %v", c.clusterID, resp.Status, err)
+ return nil, nil
+ }
+ if resp.StatusCode != http.StatusOK {
+ c.error = fmt.Errorf("error fetching from %v (%v): %v", c.clusterID, resp.Status, loadInto.Errors)
+ return nil, nil
}
+ c.responses = loadInto.Items
+ c.kind = loadInto.Kind
+
return nil, nil
}
func (h *genericFederatedRequestHandler) remoteQueryUUIDs(w http.ResponseWriter,
- req *http.Request, params url.Values,
- clusterID string, uuids []string) (rp []interface{}, kind string, err error) {
+ req *http.Request,
+ clusterID string, uuids []string) (rp []map[string]interface{}, kind string, err error) {
found := make(map[string]bool)
- for len(uuids) > 0 {
+ prev_len_uuids := len(uuids) + 1
+ // Loop while
+ // (1) there are more uuids to query
+ // (2) we're making progress - on each iteration the set of
+ // uuids we are expecting for must shrink.
+ for len(uuids) > 0 && len(uuids) < prev_len_uuids {
var remoteReq http.Request
remoteReq.Header = req.Header
remoteReq.Method = "POST"
remoteReq.URL = &url.URL{Path: req.URL.Path}
remoteParams := make(url.Values)
- remoteParams["_method"] = []string{"GET"}
- remoteParams["count"] = []string{"none"}
- if len(params["select"]) != 0 {
- remoteParams["select"] = params["select"]
+ remoteParams.Set("_method", "GET")
+ remoteParams.Set("count", "none")
+ if req.Form.Get("select") != "" {
+ remoteParams.Set("select", req.Form.Get("select"))
}
content, err := json.Marshal(uuids)
if err != nil {
@@ -200,12 +194,9 @@ func (h *genericFederatedRequestHandler) remoteQueryUUIDs(w http.ResponseWriter,
// around and do another request with just the
// stragglers.
for _, i := range rc.responses {
- m, ok := i.(map[string]interface{})
+ uuid, ok := i["uuid"].(string)
if ok {
- uuid, ok := m["uuid"].(string)
- if ok {
- found[uuid] = true
- }
+ found[uuid] = true
}
}
@@ -215,17 +206,32 @@ func (h *genericFederatedRequestHandler) remoteQueryUUIDs(w http.ResponseWriter,
l = append(l, u)
}
}
+ prev_len_uuids = len(uuids)
uuids = l
}
return rp, kind, nil
}
-func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.ResponseWriter, req *http.Request,
- params url.Values, clusterId *string) bool {
+func (h *Handler) FederatedRequestConcurrency() int {
+ if h.Cluster.FederatedRequestConcurrency == 0 {
+ return 4
+ }
+ return h.Cluster.FederatedRequestConcurrency
+}
+
+func (h *Handler) MaxItemsPerResponse() int {
+ if h.Cluster.MaxItemsPerResponse == 0 {
+ return 1000
+ }
+ return h.Cluster.MaxItemsPerResponse
+}
+
+func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.ResponseWriter,
+ req *http.Request, clusterId *string) bool {
var filters [][]interface{}
- err := json.Unmarshal([]byte(params["filters"][0]), &filters)
+ err := json.Unmarshal([]byte(req.Form.Get("filters")), &filters)
if err != nil {
httpserver.Error(w, err.Error(), http.StatusBadRequest)
return true
@@ -234,40 +240,40 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response
// Split the list of uuids by prefix
queryClusters := make(map[string][]string)
expectCount := 0
- for _, f1 := range filters {
- if len(f1) != 3 {
+ for _, filter := range filters {
+ if len(filter) != 3 {
+ return false
+ }
+
+ if lhs, ok := filter[0].(string); !ok || lhs != "uuid" {
+ return false
+ }
+
+ op, ok := filter[1].(string)
+ if !ok {
return false
}
- lhs, ok := f1[0].(string)
- if ok && lhs == "uuid" {
- op, ok := f1[1].(string)
- if !ok {
- return false
- }
- if op == "in" {
- rhs, ok := f1[2].([]interface{})
- if ok {
- for _, i := range rhs {
- u := i.(string)
+ if op == "in" {
+ if rhs, ok := filter[2].([]interface{}); ok {
+ for _, i := range rhs {
+ if u, ok := i.(string); ok {
*clusterId = u[0:5]
queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u)
+ expectCount += 1
}
- expectCount += len(rhs)
}
- } else if op == "=" {
- u, ok := f1[2].(string)
- if ok {
- *clusterId = u[0:5]
- queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u)
- expectCount += 1
- }
- } else {
- return false
+ }
+ } else if op == "=" {
+ if u, ok := filter[2].(string); ok {
+ *clusterId = u[0:5]
+ queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u)
+ expectCount += 1
}
} else {
return false
}
+
}
if len(queryClusters) <= 1 {
@@ -277,31 +283,31 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response
}
// Validations
- if !(len(params["count"]) == 1 && (params["count"][0] == `none` ||
- params["count"][0] == `"none"`)) {
+ count := req.Form.Get("count")
+ if count != "" && count != `none` && count != `"none"` {
httpserver.Error(w, "Federated multi-object query must have 'count=none'", http.StatusBadRequest)
return true
}
- if len(params["limit"]) != 0 || len(params["offset"]) != 0 || len(params["order"]) != 0 {
+ if req.Form.Get("limit") != "" || req.Form.Get("offset") != "" || req.Form.Get("order") != "" {
httpserver.Error(w, "Federated multi-object may not provide 'limit', 'offset' or 'order'.", http.StatusBadRequest)
return true
}
- if expectCount > h.handler.Cluster.MaxItemsPerResponse {
+ if expectCount > h.handler.MaxItemsPerResponse() {
httpserver.Error(w, fmt.Sprintf("Federated multi-object request for %v objects which is more than max page size %v.",
- expectCount, h.handler.Cluster.MaxItemsPerResponse), http.StatusBadRequest)
+ expectCount, h.handler.MaxItemsPerResponse()), http.StatusBadRequest)
return true
}
- if len(params["select"]) == 1 {
+ if req.Form.Get("select") != "" {
foundUUID := false
- var selects []interface{}
- err := json.Unmarshal([]byte(params["select"][0]), &selects)
+ var selects []string
+ err := json.Unmarshal([]byte(req.Form.Get("select")), &selects)
if err != nil {
httpserver.Error(w, err.Error(), http.StatusBadRequest)
return true
}
for _, r := range selects {
- if r.(string) == "uuid" {
+ if r == "uuid" {
foundUUID = true
break
}
@@ -312,18 +318,18 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response
}
}
- // Perform parallel requests to each cluster
+ // Perform concurrent requests to each cluster
- // use channel as a semaphore to limit the number of parallel
+ // use channel as a semaphore to limit the number of concurrent
// requests at a time
- sem := make(chan bool, h.handler.Cluster.ParallelRemoteRequests)
+ sem := make(chan bool, h.handler.FederatedRequestConcurrency())
defer close(sem)
wg := sync.WaitGroup{}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
mtx := sync.Mutex{}
errors := []error{}
- var completeResponses []interface{}
+ var completeResponses []map[string]interface{}
var kind string
for k, v := range queryClusters {
@@ -337,7 +343,7 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response
sem <- true
wg.Add(1)
go func(k string, v []string) {
- rp, kn, err := h.remoteQueryUUIDs(w, req, params, k, v)
+ rp, kn, err := h.remoteQueryUUIDs(w, req, k, v)
mtx.Lock()
if err == nil {
completeResponses = append(completeResponses, rp...)
@@ -379,25 +385,15 @@ func (h *genericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *h
clusterId = m[2]
}
- // First, parse the query portion of the URL.
- var params url.Values
- var err error
- if params, err = url.ParseQuery(req.URL.RawQuery); err != nil {
+ // Get form parameters from URL and form body (if POST).
+ if err := loadParamsFromForm(req); err != nil {
httpserver.Error(w, err.Error(), http.StatusBadRequest)
return
}
- // Next, if appropriate, merge in parameters from the form POST body.
- if req.Method == "POST" && req.Header.Get("Content-Type") == "application/x-www-form-urlencoded" {
- if err = loadParamsFromForm(req, params); err != nil {
- httpserver.Error(w, err.Error(), http.StatusBadRequest)
- return
- }
- }
-
// Check if the parameters have an explicit cluster_id
- if len(params["cluster_id"]) == 1 {
- clusterId = params["cluster_id"][0]
+ if req.Form.Get("cluster_id") != "" {
+ clusterId = req.Form.Get("cluster_id")
}
// Handle the POST-as-GET special case (workaround for large
@@ -405,14 +401,15 @@ func (h *genericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *h
// like multi-object queries where the filter has 100s of
// items)
effectiveMethod := req.Method
- if req.Method == "POST" && len(params["_method"]) == 1 {
- effectiveMethod = params["_method"][0]
+ if req.Method == "POST" && req.Form.Get("_method") != "" {
+ effectiveMethod = req.Form.Get("_method")
}
- if effectiveMethod == "GET" && clusterId == "" && len(params["filters"]) == 1 {
- if h.handleMultiClusterQuery(w, req, params, &clusterId) {
- return
- }
+ if effectiveMethod == "GET" &&
+ clusterId == "" &&
+ req.Form.Get("filters") != "" &&
+ h.handleMultiClusterQuery(w, req, &clusterId) {
+ return
}
if clusterId == "" || clusterId == h.handler.Cluster.ClusterID {
@@ -667,9 +664,9 @@ func (h *collectionFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req
var errors []string
var errorCode int = 404
- // use channel as a semaphore to limit the number of parallel
+ // use channel as a semaphore to limit the number of concurrent
// requests at a time
- sem := make(chan bool, h.handler.Cluster.ParallelRemoteRequests)
+ sem := make(chan bool, h.handler.FederatedRequestConcurrency())
defer close(sem)
for remoteID := range h.handler.Cluster.RemoteClusters {
// blocks until it can put a value into the
diff --git a/lib/controller/federation_test.go b/lib/controller/federation_test.go
index 0b62ce5ff..1b099be5e 100644
--- a/lib/controller/federation_test.go
+++ b/lib/controller/federation_test.go
@@ -63,8 +63,8 @@ func (s *FederationSuite) SetUpTest(c *check.C) {
NodeProfiles: map[string]arvados.NodeProfile{
"*": nodeProfile,
},
- MaxItemsPerResponse: 1000,
- ParallelRemoteRequests: 4,
+ MaxItemsPerResponse: 1000,
+ FederatedRequestConcurrency: 4,
}, NodeProfile: &nodeProfile}
s.testServer = newServerFromIntegrationTestEnv(c)
s.testServer.Server.Handler = httpserver.AddRequestIDs(httpserver.LogRequests(s.log, s.testHandler))
diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go
index f309ac7bd..7101693f3 100644
--- a/sdk/go/arvados/config.go
+++ b/sdk/go/arvados/config.go
@@ -51,15 +51,15 @@ func (sc *Config) GetCluster(clusterID string) (*Cluster, error) {
}
type Cluster struct {
- ClusterID string `json:"-"`
- ManagementToken string
- NodeProfiles map[string]NodeProfile
- InstanceTypes InstanceTypeMap
- HTTPRequestTimeout Duration
- RemoteClusters map[string]RemoteCluster
- PostgreSQL PostgreSQL
- MaxItemsPerResponse int
- ParallelRemoteRequests int
+ ClusterID string `json:"-"`
+ ManagementToken string
+ NodeProfiles map[string]NodeProfile
+ InstanceTypes InstanceTypeMap
+ HTTPRequestTimeout Duration
+ RemoteClusters map[string]RemoteCluster
+ PostgreSQL PostgreSQL
+ MaxItemsPerResponse int
+ FederatedRequestConcurrency int
}
type PostgreSQL struct {
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list