[ARVADOS] updated: 76be616a8a65a6c574026583c462640dcc9e706f
Git user
git at public.curoverse.com
Wed Jun 15 15:42:24 EDT 2016
Summary of changes:
sdk/go/arvados/client.go | 88 +++++++++++++++++++++++--------------
sdk/go/arvados/client_test.go | 70 +++++++++++++++++++++++++++++
services/keep-balance/balance.go | 12 +++--
services/keep-balance/collection.go | 11 ++++-
services/keep-balance/main.go | 8 ++++
services/keep-balance/usage.go | 17 ++++++-
6 files changed, 164 insertions(+), 42 deletions(-)
via 76be616a8a65a6c574026583c462640dcc9e706f (commit)
via ed309f2bd237c32565879dbb1e9c42ea5caba61d (commit)
via 1478093ebd5749e67c179cb8c3391870edd13c32 (commit)
via ad272d78c6238b3c5cc250e643b8fda632b11d70 (commit)
via 4441a843f6c02f5434d05db7e1bae3cbc8683d69 (commit)
from ee89a901aa93fa07f6aecde91a2f76a7a6067d5e (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 76be616a8a65a6c574026583c462640dcc9e706f
Merge: ee89a90 ed309f2
Author: Tom Clegg <tom at curoverse.com>
Date: Wed Jun 15 15:40:58 2016 -0400
Merge branch '9395-keep-balance-page-size'
closes #9395
commit ed309f2bd237c32565879dbb1e9c42ea5caba61d
Author: Tom Clegg <tom at curoverse.com>
Date: Wed Jun 15 15:40:28 2016 -0400
9395: Fix encoding of integers in query params.
diff --git a/sdk/go/arvados/client.go b/sdk/go/arvados/client.go
index ee7db23..d6d610d 100644
--- a/sdk/go/arvados/client.go
+++ b/sdk/go/arvados/client.go
@@ -3,8 +3,10 @@ package arvados
import (
"crypto/tls"
"encoding/json"
+ "fmt"
"io"
"io/ioutil"
+ "math"
"net/http"
"net/url"
"os"
@@ -80,6 +82,57 @@ func (c *Client) DoAndDecode(dst interface{}, req *http.Request) error {
return json.Unmarshal(buf, dst)
}
+// Convert an arbitrary struct to url.Values. For example,
+//
+// Foo{Bar: []int{1,2,3}, Baz: "waz"}
+//
+// becomes
+//
+// url.Values{`bar`:`{"a":[1,2,3]}`,`Baz`:`waz`}
+//
+// params itself is returned if it is already an url.Values.
+func anythingToValues(params interface{}) (url.Values, error) {
+ if v, ok := params.(url.Values); ok {
+ return v, nil
+ }
+ // TODO: Do this more efficiently, possibly using
+ // json.Decode/Encode, so the whole thing doesn't have to get
+ // encoded, decoded, and re-encoded.
+ j, err := json.Marshal(params)
+ if err != nil {
+ return nil, err
+ }
+ var generic map[string]interface{}
+ err = json.Unmarshal(j, &generic)
+ if err != nil {
+ return nil, err
+ }
+ urlValues := url.Values{}
+ for k, v := range generic {
+ if v, ok := v.(string); ok {
+ urlValues.Set(k, v)
+ continue
+ }
+ if v, ok := v.(float64); ok {
+ // Unmarshal decodes all numbers as float64,
+ // which can be written as 1.2345e4 in JSON,
+ // but this form is not accepted for ints in
+ // url params. If a number fits in an int64,
+ // encode it as int64 rather than float64.
+ if v, frac := math.Modf(v); frac == 0 && v <= math.MaxInt64 && v >= math.MinInt64 {
+ urlValues.Set(k, fmt.Sprintf("%d", int64(v)))
+ continue
+ }
+ }
+ j, err := json.Marshal(v)
+ if err != nil {
+ return nil, err
+ }
+ urlValues.Set(k, string(j))
+ }
+ return urlValues, nil
+}
+
// RequestAndDecode performs an API request and unmarshals the
// response (which must be JSON) into dst. Method and body arguments
// are the same as for http.NewRequest(). The given path is added to
@@ -89,38 +142,9 @@ func (c *Client) DoAndDecode(dst interface{}, req *http.Request) error {
// path must not contain a query string.
func (c *Client) RequestAndDecode(dst interface{}, method, path string, body io.Reader, params interface{}) error {
urlString := c.apiURL(path)
- var urlValues url.Values
- if v, ok := params.(url.Values); ok {
- urlValues = v
- } else if params != nil {
- // Convert an arbitrary struct to url.Values. For
- // example, Foo{Bar: []int{1,2,3}, Baz: "waz"} becomes
- // url.Values{`bar`:`{"a":[1,2,3]}`,`Baz`:`waz`}
- //
- // TODO: Do this more efficiently, possibly using
- // json.Decode/Encode, so the whole thing doesn't have
- // to get encoded, decoded, and re-encoded.
- j, err := json.Marshal(params)
- if err != nil {
- return err
- }
- var generic map[string]interface{}
- err = json.Unmarshal(j, &generic)
- if err != nil {
- return err
- }
- urlValues = url.Values{}
- for k, v := range generic {
- if v, ok := v.(string); ok {
- urlValues.Set(k, v)
- continue
- }
- j, err := json.Marshal(v)
- if err != nil {
- return err
- }
- urlValues.Set(k, string(j))
- }
+ urlValues, err := anythingToValues(params)
+ if err != nil {
+ return err
}
if (method == "GET" || body != nil) && urlValues != nil {
// FIXME: what if params don't fit in URL
diff --git a/sdk/go/arvados/client_test.go b/sdk/go/arvados/client_test.go
index 2db50bf..422ad90 100644
--- a/sdk/go/arvados/client_test.go
+++ b/sdk/go/arvados/client_test.go
@@ -5,6 +5,7 @@ import (
"fmt"
"io/ioutil"
"net/http"
+ "net/url"
"sync"
"testing"
)
@@ -81,3 +82,72 @@ func TestCurrentUser(t *testing.T) {
t.Errorf("got nil error, expected something awful")
}
}
+
+func TestAnythingToValues(t *testing.T) {
+ type testCase struct {
+ in interface{}
+ // ok==nil means anythingToValues should return an
+ // error, otherwise it's a func that returns true if
+ // out is correct
+ ok func(out url.Values) bool
+ }
+ for _, tc := range []testCase{
+ {
+ in: map[string]interface{}{"foo": "bar"},
+ ok: func(out url.Values) bool {
+ return out.Get("foo") == "bar"
+ },
+ },
+ {
+ in: map[string]interface{}{"foo": 2147483647},
+ ok: func(out url.Values) bool {
+ return out.Get("foo") == "2147483647"
+ },
+ },
+ {
+ in: map[string]interface{}{"foo": 1.234},
+ ok: func(out url.Values) bool {
+ return out.Get("foo") == "1.234"
+ },
+ },
+ {
+ in: map[string]interface{}{"foo": "1.234"},
+ ok: func(out url.Values) bool {
+ return out.Get("foo") == "1.234"
+ },
+ },
+ {
+ in: map[string]interface{}{"foo": map[string]interface{}{"bar":1.234}},
+ ok: func(out url.Values) bool {
+ return out.Get("foo") == `{"bar":1.234}`
+ },
+ },
+ {
+ in: url.Values{"foo": {"bar"}},
+ ok: func(out url.Values) bool {
+ return out.Get("foo") == "bar"
+ },
+ },
+ {
+ in: 1234,
+ ok: nil,
+ },
+ {
+ in: []string{"foo"},
+ ok: nil,
+ },
+ } {
+ t.Logf("%#v", tc.in)
+ out, err := anythingToValues(tc.in)
+ switch {
+ case tc.ok == nil:
+ if err == nil {
+ t.Errorf("got %#v, expected error", out)
+ }
+ case err != nil:
+ t.Errorf("got err %#v, expected nil", err)
+ case !tc.ok(out):
+ t.Errorf("got %#v but tc.ok() says that is wrong", out)
+ }
+ }
+}
commit 1478093ebd5749e67c179cb8c3391870edd13c32
Author: Tom Clegg <tom at curoverse.com>
Date: Wed Jun 15 10:53:01 2016 -0400
9395: Explain CollectionBatchSize and CollectionBuffers in -help message.
diff --git a/services/keep-balance/usage.go b/services/keep-balance/usage.go
index 2273f3a..b521c65 100644
--- a/services/keep-balance/usage.go
+++ b/services/keep-balance/usage.go
@@ -73,6 +73,19 @@ Committing:
Use the -commit-pull and -commit-trash flags to implement the
computed changes.
+Tuning resource usage:
+
+ CollectionBatchSize limits the number of collections retrieved per
+ API transaction. If this is zero or omitted, page size is
+ determined by the API server's own page size limits (see
+ max_items_per_response and max_index_database_read configs).
+
+ CollectionBuffers sets the size of an internal queue of
+ collections. Higher values use more memory, and improve throughput
+ by allowing keep-balance to fetch the next page of collections
+ while the current page is still being processed. If this is zero
+ or omitted, pages are processed serially.
+
Limitations:
keep-balance does not attempt to discover whether committed pull
commit ad272d78c6238b3c5cc250e643b8fda632b11d70
Author: Tom Clegg <tom at curoverse.com>
Date: Mon Jun 13 10:31:39 2016 -0400
9395: Add CollectionBuffers config to keep-balance.
diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go
index 0ca6e71..2d1a59e 100644
--- a/services/keep-balance/balance.go
+++ b/services/keep-balance/balance.go
@@ -80,7 +80,7 @@ func (bal *Balancer) Run(config Config, runOptions RunOptions) (err error) {
return
}
}
- if err = bal.GetCurrentState(&config.Client, config.CollectionBatchSize); err != nil {
+ if err = bal.GetCurrentState(&config.Client, config.CollectionBatchSize, config.CollectionBuffers); err != nil {
return
}
bal.ComputeChangeSets()
@@ -190,7 +190,7 @@ func (bal *Balancer) ClearTrashLists(c *arvados.Client) error {
// collection manifests in the database (API server).
//
// It encodes the resulting information in BlockStateMap.
-func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize int) error {
+func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) error {
defer timeMe(bal.Logger, "GetCurrentState")()
bal.BlockStateMap = NewBlockStateMap()
@@ -225,7 +225,7 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize int) error {
// collQ buffers incoming collections so we can start fetching
// the next page without waiting for the current page to
// finish processing.
- collQ := make(chan arvados.Collection, 1000)
+ collQ := make(chan arvados.Collection, bufs)
// Start a goroutine to process collections. (We could use a
// worker pool here, but even with a single worker we already
diff --git a/services/keep-balance/main.go b/services/keep-balance/main.go
index 6e7d70d..364bb3f 100644
--- a/services/keep-balance/main.go
+++ b/services/keep-balance/main.go
@@ -31,6 +31,11 @@ type Config struct {
// Number of collections to request in each API call
CollectionBatchSize int
+
+ // Max collections to buffer in memory (bigger values consume
+ // more memory, but can reduce store-and-forward latency when
+ // fetching pages)
+ CollectionBuffers int
}
// RunOptions controls runtime behavior. The flags/options that belong
diff --git a/services/keep-balance/usage.go b/services/keep-balance/usage.go
index c536364..2273f3a 100644
--- a/services/keep-balance/usage.go
+++ b/services/keep-balance/usage.go
@@ -17,7 +17,8 @@ var exampleConfigFile = []byte(`
"disk"
],
"RunPeriod": "600s",
- "CollectionBatchSize": 100000
+ "CollectionBatchSize": 100000,
+ "CollectionBuffers": 1000
}`)
func usage() {
commit 4441a843f6c02f5434d05db7e1bae3cbc8683d69
Author: Tom Clegg <tom at curoverse.com>
Date: Mon Jun 13 10:30:23 2016 -0400
9395: Add CollectionBatchSize config to keep-balance.
diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go
index 2a2480c..0ca6e71 100644
--- a/services/keep-balance/balance.go
+++ b/services/keep-balance/balance.go
@@ -80,7 +80,7 @@ func (bal *Balancer) Run(config Config, runOptions RunOptions) (err error) {
return
}
}
- if err = bal.GetCurrentState(&config.Client); err != nil {
+ if err = bal.GetCurrentState(&config.Client, config.CollectionBatchSize); err != nil {
return
}
bal.ComputeChangeSets()
@@ -190,7 +190,7 @@ func (bal *Balancer) ClearTrashLists(c *arvados.Client) error {
// collection manifests in the database (API server).
//
// It encodes the resulting information in BlockStateMap.
-func (bal *Balancer) GetCurrentState(c *arvados.Client) error {
+func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize int) error {
defer timeMe(bal.Logger, "GetCurrentState")()
bal.BlockStateMap = NewBlockStateMap()
@@ -224,9 +224,7 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client) error {
// collQ buffers incoming collections so we can start fetching
// the next page without waiting for the current page to
- // finish processing. (1000 happens to match the page size
- // used by (*arvados.Client)EachCollection(), but it's OK if
- // they don't match.)
+ // finish processing.
collQ := make(chan arvados.Collection, 1000)
// Start a goroutine to process collections. (We could use a
@@ -252,7 +250,7 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client) error {
wg.Add(1)
go func() {
defer wg.Done()
- err = EachCollection(c,
+ err = EachCollection(c, pageSize,
func(coll arvados.Collection) error {
collQ <- coll
if len(errs) > 0 {
diff --git a/services/keep-balance/collection.go b/services/keep-balance/collection.go
index e6a1f08..f4fc721 100644
--- a/services/keep-balance/collection.go
+++ b/services/keep-balance/collection.go
@@ -22,7 +22,10 @@ func countCollections(c *arvados.Client, params arvados.ResourceListParams) (int
// The progress function is called periodically with done (number of
// times f has been called) and total (number of times f is expected
// to be called).
-func EachCollection(c *arvados.Client, f func(arvados.Collection) error, progress func(done, total int)) error {
+//
+// If pageSize > 0 it is used as the maximum page size in each API
+// call; otherwise the maximum allowed page size is requested.
+func EachCollection(c *arvados.Client, pageSize int, f func(arvados.Collection) error, progress func(done, total int)) error {
if progress == nil {
progress = func(_, _ int) {}
}
@@ -32,7 +35,11 @@ func EachCollection(c *arvados.Client, f func(arvados.Collection) error, progres
return err
}
- limit := 1000
+ limit := pageSize
+ if limit <= 0 {
+ // Use the maximum page size the server allows
+ limit = 1<<31 - 1
+ }
params := arvados.ResourceListParams{
Limit: &limit,
Order: "modified_at, uuid",
diff --git a/services/keep-balance/main.go b/services/keep-balance/main.go
index 42a8d63..6e7d70d 100644
--- a/services/keep-balance/main.go
+++ b/services/keep-balance/main.go
@@ -28,6 +28,9 @@ type Config struct {
// How often to check
RunPeriod arvados.Duration
+
+ // Number of collections to request in each API call
+ CollectionBatchSize int
}
// RunOptions controls runtime behavior. The flags/options that belong
diff --git a/services/keep-balance/usage.go b/services/keep-balance/usage.go
index eb9990c..c536364 100644
--- a/services/keep-balance/usage.go
+++ b/services/keep-balance/usage.go
@@ -16,7 +16,8 @@ var exampleConfigFile = []byte(`
"KeepServiceTypes": [
"disk"
],
- "RunPeriod": "600s"
+ "RunPeriod": "600s",
+ "CollectionBatchSize": 100000
}`)
func usage() {
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list