[ARVADOS] updated: c900f416c36cd74675c5bf4c33ad1dbe5d1e78fa
Git user
git at public.curoverse.com
Tue May 31 16:24:13 EDT 2016
Summary of changes:
build/package-build-dockerfiles/Makefile | 8 +-
build/package-build-dockerfiles/centos6/Dockerfile | 2 +-
build/package-build-dockerfiles/debian7/Dockerfile | 2 +-
build/package-build-dockerfiles/debian8/Dockerfile | 2 +-
.../ubuntu1204/Dockerfile | 2 +-
.../ubuntu1404/Dockerfile | 2 +-
build/run-build-packages-one-target.sh | 3 +-
build/run-build-packages.sh | 2 +
build/run-tests.sh | 8 +-
sdk/go/arvados/client.go | 169 ++++++
sdk/go/arvados/client_test.go | 83 +++
sdk/go/arvados/collection.go | 62 ++
sdk/go/arvados/doc.go | 12 +
sdk/go/arvados/duration.go | 31 +
sdk/go/arvados/keep_block.go | 15 +
sdk/go/arvados/keep_service.go | 123 ++++
sdk/go/arvados/resource_list.go | 25 +
sdk/go/arvados/resource_list_test.go | 21 +
sdk/go/arvados/user.go | 17 +
services/keep-balance/balance.go | 638 +++++++++++++++++++++
services/keep-balance/balance_run_test.go | 374 ++++++++++++
services/keep-balance/balance_test.go | 255 ++++++++
services/keep-balance/block_state.go | 95 +++
services/keep-balance/change_set.go | 75 +++
services/keep-balance/change_set_test.go | 35 ++
services/keep-balance/collection.go | 95 +++
services/keep-balance/integration_test.go | 92 +++
services/keep-balance/keep_service.go | 76 +++
services/keep-balance/main.go | 156 +++++
services/keep-balance/main_test.go | 43 ++
services/keep-balance/time_me.go | 14 +
services/keep-balance/usage.go | 83 +++
services/keepstore/keepstore.go | 7 +-
33 files changed, 2616 insertions(+), 11 deletions(-)
create mode 100644 sdk/go/arvados/client.go
create mode 100644 sdk/go/arvados/client_test.go
create mode 100644 sdk/go/arvados/collection.go
create mode 100644 sdk/go/arvados/doc.go
create mode 100644 sdk/go/arvados/duration.go
create mode 100644 sdk/go/arvados/keep_block.go
create mode 100644 sdk/go/arvados/keep_service.go
create mode 100644 sdk/go/arvados/resource_list.go
create mode 100644 sdk/go/arvados/resource_list_test.go
create mode 100644 sdk/go/arvados/user.go
create mode 100644 services/keep-balance/balance.go
create mode 100644 services/keep-balance/balance_run_test.go
create mode 100644 services/keep-balance/balance_test.go
create mode 100644 services/keep-balance/block_state.go
create mode 100644 services/keep-balance/change_set.go
create mode 100644 services/keep-balance/change_set_test.go
create mode 100644 services/keep-balance/collection.go
create mode 100644 services/keep-balance/integration_test.go
create mode 100644 services/keep-balance/keep_service.go
create mode 100644 services/keep-balance/main.go
create mode 100644 services/keep-balance/main_test.go
create mode 100644 services/keep-balance/time_me.go
create mode 100644 services/keep-balance/usage.go
via c900f416c36cd74675c5bf4c33ad1dbe5d1e78fa (commit)
via 27f948d88bbdd5be848011871d2b592d7057ece1 (commit)
via b002129afda08bbb4fdbed6e629858a5c298c068 (commit)
from ae72b172c8bf8a52358a89f8a8d744ec5bf2d993 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit c900f416c36cd74675c5bf4c33ad1dbe5d1e78fa
Merge: ae72b17 27f948d
Author: Tom Clegg <tom at curoverse.com>
Date: Tue May 31 16:23:30 2016 -0400
Merge branch '9162-keep-balance'
closes #9162
commit 27f948d88bbdd5be848011871d2b592d7057ece1
Author: Tom Clegg <tom at curoverse.com>
Date: Tue May 24 10:02:39 2016 -0400
9162: Add replication level histogram
Ported from 00a8ece1580a894dbbf9f756685eefc134e4d0d6 by jrandall
diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go
index 7471aa6..2a2480c 100644
--- a/services/keep-balance/balance.go
+++ b/services/keep-balance/balance.go
@@ -3,6 +3,7 @@ package main
import (
"fmt"
"log"
+ "math"
"os"
"runtime"
"strings"
@@ -447,9 +448,11 @@ type balancerStats struct {
lost, overrep, unref, garbage, underrep, justright blocksNBytes
desired, current blocksNBytes
pulls, trashes int
+ replHistogram []int
}
func (bal *Balancer) getStatistics() (s balancerStats) {
+ s.replHistogram = make([]int, 2)
bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
surplus := len(blk.Replicas) - blk.Desired
bytes := blkid.Size()
@@ -493,6 +496,11 @@ func (bal *Balancer) getStatistics() (s balancerStats) {
s.current.blocks++
s.current.bytes += bytes * int64(len(blk.Replicas))
}
+
+ for len(s.replHistogram) <= len(blk.Replicas) {
+ s.replHistogram = append(s.replHistogram, 0)
+ }
+ s.replHistogram[len(blk.Replicas)]++
})
for _, srv := range bal.KeepServices {
s.pulls += len(srv.ChangeSet.Pulls)
@@ -521,6 +529,25 @@ func (bal *Balancer) PrintStatistics() {
bal.logf("%s: %v\n", srv, srv.ChangeSet)
}
bal.logf("===")
+ bal.printHistogram(s, 60)
+ bal.logf("===")
+}
+
+func (bal *Balancer) printHistogram(s balancerStats, hashColumns int) {
+ bal.logf("Replication level distribution (counting N replicas on a single server as N):")
+ maxCount := 0
+ for _, count := range s.replHistogram {
+ if maxCount < count {
+ maxCount = count
+ }
+ }
+ hashes := strings.Repeat("#", hashColumns)
+ countWidth := 1 + int(math.Log10(float64(maxCount+1)))
+ scaleCount := 10 * float64(hashColumns) / math.Floor(1+10*math.Log10(float64(maxCount+1)))
+ for repl, count := range s.replHistogram {
+ nHashes := int(scaleCount * math.Log10(float64(count+1)))
+ bal.logf("%2d: %*d %s", repl, countWidth, count, hashes[:nHashes])
+ }
}
// CheckSanityLate checks for configuration and runtime errors after
commit b002129afda08bbb4fdbed6e629858a5c298c068
Author: Tom Clegg <tom at curoverse.com>
Date: Mon May 16 17:09:21 2016 -0400
9162: Add keep-balance
diff --git a/build/package-build-dockerfiles/Makefile b/build/package-build-dockerfiles/Makefile
index 9216f82..3482886 100644
--- a/build/package-build-dockerfiles/Makefile
+++ b/build/package-build-dockerfiles/Makefile
@@ -20,10 +20,12 @@ ubuntu1404/generated: common-generated-all
test -d ubuntu1404/generated || mkdir ubuntu1404/generated
cp -rlt ubuntu1404/generated common-generated/*
-common-generated-all: common-generated/golang-amd64.tar.gz
+GOTARBALL=go1.6.2.linux-amd64.tar.gz
-common-generated/golang-amd64.tar.gz: common-generated
- wget -cqO common-generated/golang-amd64.tar.gz http://storage.googleapis.com/golang/go1.4.2.linux-amd64.tar.gz
+common-generated-all: common-generated/$(GOTARBALL)
+
+common-generated/$(GOTARBALL): common-generated
+ wget -cqO common-generated/$(GOTARBALL) http://storage.googleapis.com/golang/$(GOTARBALL)
common-generated:
mkdir common-generated
diff --git a/build/package-build-dockerfiles/centos6/Dockerfile b/build/package-build-dockerfiles/centos6/Dockerfile
index c21c091..570dde1 100644
--- a/build/package-build-dockerfiles/centos6/Dockerfile
+++ b/build/package-build-dockerfiles/centos6/Dockerfile
@@ -5,7 +5,7 @@ MAINTAINER Brett Smith <brett at curoverse.com>
RUN yum -q -y install make automake gcc gcc-c++ libyaml-devel patch readline-devel zlib-devel libffi-devel openssl-devel bzip2 libtool bison sqlite-devel rpm-build git perl-ExtUtils-MakeMaker libattr-devel nss-devel libcurl-devel which tar unzip scl-utils centos-release-scl postgresql-devel
# Install golang binary
-ADD generated/golang-amd64.tar.gz /usr/local/
+ADD generated/go1.6.2.linux-amd64.tar.gz /usr/local/
RUN ln -s /usr/local/go/bin/go /usr/local/bin/
# Install RVM
diff --git a/build/package-build-dockerfiles/debian7/Dockerfile b/build/package-build-dockerfiles/debian7/Dockerfile
index ccc444c..ddad542 100644
--- a/build/package-build-dockerfiles/debian7/Dockerfile
+++ b/build/package-build-dockerfiles/debian7/Dockerfile
@@ -13,7 +13,7 @@ RUN gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3 && \
/usr/local/rvm/bin/rvm-exec default gem install cure-fpm --version 1.6.0b
# Install golang binary
-ADD generated/golang-amd64.tar.gz /usr/local/
+ADD generated/go1.6.2.linux-amd64.tar.gz /usr/local/
RUN ln -s /usr/local/go/bin/go /usr/local/bin/
ENV WORKSPACE /arvados
diff --git a/build/package-build-dockerfiles/debian8/Dockerfile b/build/package-build-dockerfiles/debian8/Dockerfile
index e32cfb4..80f06a2 100644
--- a/build/package-build-dockerfiles/debian8/Dockerfile
+++ b/build/package-build-dockerfiles/debian8/Dockerfile
@@ -13,7 +13,7 @@ RUN gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3 && \
/usr/local/rvm/bin/rvm-exec default gem install cure-fpm --version 1.6.0b
# Install golang binary
-ADD generated/golang-amd64.tar.gz /usr/local/
+ADD generated/go1.6.2.linux-amd64.tar.gz /usr/local/
RUN ln -s /usr/local/go/bin/go /usr/local/bin/
ENV WORKSPACE /arvados
diff --git a/build/package-build-dockerfiles/ubuntu1204/Dockerfile b/build/package-build-dockerfiles/ubuntu1204/Dockerfile
index 34bf698..2f628b0 100644
--- a/build/package-build-dockerfiles/ubuntu1204/Dockerfile
+++ b/build/package-build-dockerfiles/ubuntu1204/Dockerfile
@@ -13,7 +13,7 @@ RUN gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3 && \
/usr/local/rvm/bin/rvm-exec default gem install cure-fpm --version 1.6.0b
# Install golang binary
-ADD generated/golang-amd64.tar.gz /usr/local/
+ADD generated/go1.6.2.linux-amd64.tar.gz /usr/local/
RUN ln -s /usr/local/go/bin/go /usr/local/bin/
ENV WORKSPACE /arvados
diff --git a/build/package-build-dockerfiles/ubuntu1404/Dockerfile b/build/package-build-dockerfiles/ubuntu1404/Dockerfile
index 5377164..b9c003a 100644
--- a/build/package-build-dockerfiles/ubuntu1404/Dockerfile
+++ b/build/package-build-dockerfiles/ubuntu1404/Dockerfile
@@ -13,7 +13,7 @@ RUN gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3 && \
/usr/local/rvm/bin/rvm-exec default gem install cure-fpm --version 1.6.0b
# Install golang binary
-ADD generated/golang-amd64.tar.gz /usr/local/
+ADD generated/go1.6.2.linux-amd64.tar.gz /usr/local/
RUN ln -s /usr/local/go/bin/go /usr/local/bin/
ENV WORKSPACE /arvados
diff --git a/build/run-build-packages-one-target.sh b/build/run-build-packages-one-target.sh
index 322129e..6fdffd0 100755
--- a/build/run-build-packages-one-target.sh
+++ b/build/run-build-packages-one-target.sh
@@ -128,9 +128,10 @@ if test -z "$packages" ; then
arvados-src
arvados-workbench
crunchstat
+ keep-balance
+ keep-block-check
keepproxy
keep-rsync
- keep-block-check
keepstore
keep-web
libarvados-perl"
diff --git a/build/run-build-packages.sh b/build/run-build-packages.sh
index a51198f..7631718 100755
--- a/build/run-build-packages.sh
+++ b/build/run-build-packages.sh
@@ -384,6 +384,8 @@ package_go_binary services/keepstore keepstore \
"Keep storage daemon, accessible to clients on the LAN"
package_go_binary services/keepproxy keepproxy \
"Make a Keep cluster accessible to clients that are not on the LAN"
+package_go_binary services/keep-balance keep-balance \
+ "Rebalance and garbage-collect data blocks stored in Arvados Keep"
package_go_binary services/keep-web keep-web \
"Static web hosting service for user data stored in Arvados Keep"
package_go_binary services/datamanager arvados-data-manager \
diff --git a/build/run-tests.sh b/build/run-tests.sh
index d229341..30a80f5 100755
--- a/build/run-tests.sh
+++ b/build/run-tests.sh
@@ -70,6 +70,7 @@ services/fuse
services/keep-web
services/keepproxy
services/keepstore
+services/keep-balance
services/login-sync
services/nodemanager
services/crunch-run
@@ -79,6 +80,7 @@ sdk/cli
sdk/pam
sdk/python
sdk/ruby
+sdk/go/arvados
sdk/go/arvadosclient
sdk/go/keepclient
sdk/go/httpserver
@@ -150,6 +152,8 @@ sanity_checks() {
echo -n 'go: '
go version \
|| fatal "No go binary. See http://golang.org/doc/install"
+ [[ $(go version) =~ go1.([0-9]+) ]] && [[ ${BASH_REMATCH[1]} -ge 6 ]] \
+ || fatal "Go >= 1.6 required. See http://golang.org/doc/install"
echo -n 'gcc: '
gcc --version | egrep ^gcc \
|| fatal "No gcc. Try: apt-get install build-essential"
@@ -499,7 +503,7 @@ do_test_once() {
then
# "go test -check.vv giturl" doesn't work, but this
# does:
- cd "$WORKSPACE/$1" && go test ${short:+-short} ${coverflags[@]} ${testargs[$1]}
+ cd "$WORKSPACE/$1" && go test ${short:+-short} ${testargs[$1]}
else
# The above form gets verbose even when testargs is
# empty, so use this form in such cases:
@@ -703,6 +707,7 @@ do_install services/api apiserver
declare -a gostuff
gostuff=(
+ sdk/go/arvados
sdk/go/arvadosclient
sdk/go/blockdigest
sdk/go/httpserver
@@ -714,6 +719,7 @@ gostuff=(
services/keep-web
services/keepstore
sdk/go/keepclient
+ services/keep-balance
services/keepproxy
services/datamanager/summary
services/datamanager/collection
diff --git a/sdk/go/arvados/client.go b/sdk/go/arvados/client.go
new file mode 100644
index 0000000..ee830c8
--- /dev/null
+++ b/sdk/go/arvados/client.go
@@ -0,0 +1,169 @@
+package arvados
+
+import (
+ "crypto/tls"
+ "encoding/json"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "net/http"
+ "net/url"
+ "os"
+)
+
+// A Client is an HTTP client with an API endpoint and a set of
+// Arvados credentials.
+//
+// It offers methods for accessing individual Arvados APIs, and
+// methods that implement common patterns like fetching multiple pages
+// of results using List APIs.
+type Client struct {
+ // HTTP client used to make requests. If nil,
+ // http.DefaultClient or InsecureHTTPClient will be used.
+ Client *http.Client
+
+ // Hostname (or host:port) of Arvados API server.
+ APIHost string
+
+ // User authentication token.
+ AuthToken string
+
+ // Accept unverified certificates. This works only if the
+ // Client field is nil: otherwise, it has no effect.
+ Insecure bool
+}
+
+// The default http.Client used by a Client with Insecure==true and
+// Client==nil.
+var InsecureHTTPClient = &http.Client{
+ Transport: &http.Transport{
+ TLSClientConfig: &tls.Config{
+ InsecureSkipVerify: true}}}
+
+// NewClientFromEnv creates a new Client that uses the default HTTP
+// client with the API endpoint and credentials given by the
+// ARVADOS_API_* environment variables.
+func NewClientFromEnv() *Client {
+ return &Client{
+ APIHost: os.Getenv("ARVADOS_API_HOST"),
+ AuthToken: os.Getenv("ARVADOS_API_TOKEN"),
+ Insecure: os.Getenv("ARVADOS_API_HOST_INSECURE") != "",
+ }
+}
+
+// Do adds authentication headers and then calls (*http.Client)Do().
+func (c *Client) Do(req *http.Request) (*http.Response, error) {
+ if c.AuthToken != "" {
+ req.Header.Add("Authorization", "OAuth2 "+c.AuthToken)
+ }
+ return c.httpClient().Do(req)
+}
+
+// DoAndDecode performs req and unmarshals the response (which must be
+// JSON) into dst. Use this instead of RequestAndDecode if you need
+// more control of the http.Request object.
+func (c *Client) DoAndDecode(dst interface{}, req *http.Request) error {
+ resp, err := c.Do(req)
+ if err != nil {
+ return err
+ }
+ defer resp.Body.Close()
+ buf, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ return err
+ }
+ if resp.StatusCode != 200 {
+ return fmt.Errorf("request failed (%s): %s", req.URL, resp.Status)
+ }
+ if dst == nil {
+ return nil
+ }
+ return json.Unmarshal(buf, dst)
+}
+
+// RequestAndDecode performs an API request and unmarshals the
+// response (which must be JSON) into dst. Method and body arguments
+// are the same as for http.NewRequest(). The given path is added to
+// the server's scheme/host/port to form the request URL. The given
+// params are passed via POST form or query string.
+//
+// path must not contain a query string.
+func (c *Client) RequestAndDecode(dst interface{}, method, path string, body io.Reader, params interface{}) error {
+ urlString := c.apiURL(path)
+ var urlValues url.Values
+ if v, ok := params.(url.Values); ok {
+ urlValues = v
+ } else if params != nil {
+ // Convert an arbitrary struct to url.Values. For
+ // example, Foo{Bar: []int{1,2,3}, Baz: "waz"} becomes
+ // url.Values{`bar`:`{"a":[1,2,3]}`,`Baz`:`waz`}
+ //
+ // TODO: Do this more efficiently, possibly using
+ // json.Decode/Encode, so the whole thing doesn't have
+ // to get encoded, decoded, and re-encoded.
+ j, err := json.Marshal(params)
+ if err != nil {
+ return err
+ }
+ var generic map[string]interface{}
+ err = json.Unmarshal(j, &generic)
+ if err != nil {
+ return err
+ }
+ urlValues = url.Values{}
+ for k, v := range generic {
+ if v, ok := v.(string); ok {
+ urlValues.Set(k, v)
+ continue
+ }
+ j, err := json.Marshal(v)
+ if err != nil {
+ return err
+ }
+ urlValues.Set(k, string(j))
+ }
+ }
+ if (method == "GET" || body != nil) && urlValues != nil {
+ // FIXME: what if params don't fit in URL
+ u, err := url.Parse(urlString)
+ if err != nil {
+ return err
+ }
+ u.RawQuery = urlValues.Encode()
+ urlString = u.String()
+ }
+ req, err := http.NewRequest(method, urlString, body)
+ if err != nil {
+ return err
+ }
+ return c.DoAndDecode(dst, req)
+}
+
+func (c *Client) httpClient() *http.Client {
+ switch {
+ case c.Client != nil:
+ return c.Client
+ case c.Insecure:
+ return InsecureHTTPClient
+ default:
+ return http.DefaultClient
+ }
+}
+
+func (c *Client) apiURL(path string) string {
+ return "https://" + c.APIHost + "/" + path
+}
+
+// DiscoveryDocument is the Arvados server's description of itself.
+type DiscoveryDocument struct {
+ DefaultCollectionReplication int `json:"defaultCollectionReplication"`
+ BlobSignatureTTL int64 `json:"blobSignatureTtl"`
+}
+
+// DiscoveryDocument returns a *DiscoveryDocument. The returned object
+// should not be modified: the same object may be returned by
+// subsequent calls.
+func (c *Client) DiscoveryDocument() (*DiscoveryDocument, error) {
+ var dd DiscoveryDocument
+ return &dd, c.RequestAndDecode(&dd, "GET", "discovery/v1/apis/arvados/v1/rest", nil, nil)
+}
diff --git a/sdk/go/arvados/client_test.go b/sdk/go/arvados/client_test.go
new file mode 100644
index 0000000..2db50bf
--- /dev/null
+++ b/sdk/go/arvados/client_test.go
@@ -0,0 +1,83 @@
+package arvados
+
+import (
+ "bytes"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+ "sync"
+ "testing"
+)
+
+type stubTransport struct {
+ Responses map[string]string
+ Requests []http.Request
+ sync.Mutex
+}
+
+func (stub *stubTransport) RoundTrip(req *http.Request) (*http.Response, error) {
+ stub.Lock()
+ stub.Requests = append(stub.Requests, *req)
+ stub.Unlock()
+
+ resp := &http.Response{
+ Status: "200 OK",
+ StatusCode: 200,
+ Proto: "HTTP/1.1",
+ ProtoMajor: 1,
+ ProtoMinor: 1,
+ Request: req,
+ }
+ str := stub.Responses[req.URL.Path]
+ if str == "" {
+ resp.Status = "404 Not Found"
+ resp.StatusCode = 404
+ str = "{}"
+ }
+ buf := bytes.NewBufferString(str)
+ resp.Body = ioutil.NopCloser(buf)
+ resp.ContentLength = int64(buf.Len())
+ return resp, nil
+}
+
+type errorTransport struct{}
+
+func (stub *errorTransport) RoundTrip(req *http.Request) (*http.Response, error) {
+ return nil, fmt.Errorf("something awful happened")
+}
+
+func TestCurrentUser(t *testing.T) {
+ t.Parallel()
+ stub := &stubTransport{
+ Responses: map[string]string{
+ "/arvados/v1/users/current": `{"uuid":"zzzzz-abcde-012340123401234"}`,
+ },
+ }
+ c := &Client{
+ Client: &http.Client{
+ Transport: stub,
+ },
+ APIHost: "zzzzz.arvadosapi.com",
+ AuthToken: "xyzzy",
+ }
+ u, err := c.CurrentUser()
+ if err != nil {
+ t.Fatal(err)
+ }
+ if x := "zzzzz-abcde-012340123401234"; u.UUID != x {
+ t.Errorf("got uuid %q, expected %q", u.UUID, x)
+ }
+ if len(stub.Requests) < 1 {
+ t.Fatal("empty stub.Requests")
+ }
+ hdr := stub.Requests[len(stub.Requests)-1].Header
+ if hdr.Get("Authorization") != "OAuth2 xyzzy" {
+ t.Errorf("got headers %+q, expected Authorization header", hdr)
+ }
+
+ c.Client.Transport = &errorTransport{}
+ u, err = c.CurrentUser()
+ if err == nil {
+ t.Errorf("got nil error, expected something awful")
+ }
+}
diff --git a/sdk/go/arvados/collection.go b/sdk/go/arvados/collection.go
new file mode 100644
index 0000000..71f5247
--- /dev/null
+++ b/sdk/go/arvados/collection.go
@@ -0,0 +1,62 @@
+package arvados
+
+import (
+ "bufio"
+ "fmt"
+ "strings"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/manifest"
+)
+
+// Collection is an arvados#collection resource.
+type Collection struct {
+ UUID string `json:"uuid,omitempty"`
+ ExpiresAt *time.Time `json:"expires_at,omitempty"`
+ ManifestText string `json:"manifest_text,omitempty"`
+ CreatedAt *time.Time `json:"created_at,omitempty"`
+ ModifiedAt *time.Time `json:"modified_at,omitempty"`
+ PortableDataHash string `json:"portable_data_hash,omitempty"`
+ ReplicationConfirmed *int `json:"replication_confirmed,omitempty"`
+ ReplicationConfirmedAt *time.Time `json:"replication_confirmed_at,omitempty"`
+ ReplicationDesired *int `json:"replication_desired,omitempty"`
+}
+
+// SizedDigests returns the hash+size part of each data block
+// referenced by the collection.
+func (c *Collection) SizedDigests() ([]SizedDigest, error) {
+ if c.ManifestText == "" && c.PortableDataHash != "d41d8cd98f00b204e9800998ecf8427e+0" {
+ // TODO: Check more subtle forms of corruption, too
+ return nil, fmt.Errorf("manifest is missing")
+ }
+ var sds []SizedDigest
+ scanner := bufio.NewScanner(strings.NewReader(c.ManifestText))
+ scanner.Buffer(make([]byte, 1048576), len(c.ManifestText))
+ for scanner.Scan() {
+ line := scanner.Text()
+ tokens := strings.Split(line, " ")
+ if len(tokens) < 3 {
+ return nil, fmt.Errorf("Invalid stream (<3 tokens): %q", line)
+ }
+ for _, token := range tokens[1:] {
+ if !manifest.LocatorPattern.MatchString(token) {
+ // FIXME: ensure it's a file token
+ break
+ }
+ // FIXME: shouldn't assume 32 char hash
+ if i := strings.IndexRune(token[33:], '+'); i >= 0 {
+ token = token[:33+i]
+ }
+ sds = append(sds, SizedDigest(token))
+ }
+ }
+ return sds, scanner.Err()
+}
+
+// CollectionList is an arvados#collectionList resource.
+type CollectionList struct {
+ Items []Collection `json:"items"`
+ ItemsAvailable int `json:"items_available"`
+ Offset int `json:"offset"`
+ Limit int `json:"limit"`
+}
diff --git a/sdk/go/arvados/doc.go b/sdk/go/arvados/doc.go
new file mode 100644
index 0000000..1e8141e
--- /dev/null
+++ b/sdk/go/arvados/doc.go
@@ -0,0 +1,12 @@
+// Package arvados is a client library for Arvados.
+//
+// The API is not stable: it should be considered experimental
+// pre-release.
+//
+// The intent is to offer model types and API call functions that can
+// be generated automatically (or at least mostly automatically) from
+// a discovery document. For the time being, there is a manually
+// generated subset of those types and API calls with (approximately)
+// the right signatures, plus client/authentication support and some
+// convenience functions.
+package arvados
diff --git a/sdk/go/arvados/duration.go b/sdk/go/arvados/duration.go
new file mode 100644
index 0000000..1639c58
--- /dev/null
+++ b/sdk/go/arvados/duration.go
@@ -0,0 +1,31 @@
+package arvados
+
+import (
+ "encoding/json"
+ "fmt"
+ "time"
+)
+
+// Duration is time.Duration but looks like "12s" in JSON, rather than
+// a number of nanoseconds.
+type Duration time.Duration
+
+// UnmarshalJSON implements json.Unmarshaler
+func (d *Duration) UnmarshalJSON(data []byte) error {
+ if data[0] == '"' {
+ dur, err := time.ParseDuration(string(data[1 : len(data)-1]))
+ *d = Duration(dur)
+ return err
+ }
+ return fmt.Errorf("duration must be given as a string like \"600s\" or \"1h30m\"")
+}
+
+// MarshalJSON implements json.Marshaler
+func (d *Duration) MarshalJSON() ([]byte, error) {
+ return json.Marshal(d.String())
+}
+
+// String implements fmt.Stringer
+func (d Duration) String() string {
+ return time.Duration(d).String()
+}
diff --git a/sdk/go/arvados/keep_block.go b/sdk/go/arvados/keep_block.go
new file mode 100644
index 0000000..c9a7712
--- /dev/null
+++ b/sdk/go/arvados/keep_block.go
@@ -0,0 +1,15 @@
+package arvados
+
+import (
+ "strconv"
+ "strings"
+)
+
+// SizedDigest is a minimal Keep block locator: hash+size
+type SizedDigest string
+
+// Size returns the size of the data block, in bytes.
+func (sd SizedDigest) Size() int64 {
+ n, _ := strconv.ParseInt(strings.Split(string(sd), "+")[1], 10, 64)
+ return n
+}
diff --git a/sdk/go/arvados/keep_service.go b/sdk/go/arvados/keep_service.go
new file mode 100644
index 0000000..4af1b79
--- /dev/null
+++ b/sdk/go/arvados/keep_service.go
@@ -0,0 +1,123 @@
+package arvados
+
+import (
+ "bufio"
+ "fmt"
+ "net/http"
+ "strconv"
+ "strings"
+)
+
+// KeepService is an arvados#keepService record
+type KeepService struct {
+ UUID string `json:"uuid"`
+ ServiceHost string `json:"service_host"`
+ ServicePort int `json:"service_port"`
+ ServiceSSLFlag bool `json:"service_ssl_flag"`
+ ServiceType string `json:"service_type"`
+ ReadOnly bool `json:"read_only"`
+}
+
+// KeepServiceList is an arvados#keepServiceList record
+type KeepServiceList struct {
+ Items []KeepService `json:"items"`
+ ItemsAvailable int `json:"items_available"`
+ Offset int `json:"offset"`
+ Limit int `json:"limit"`
+}
+
+// KeepServiceIndexEntry is what a keep service's index response tells
+// us about a stored block.
+type KeepServiceIndexEntry struct {
+ SizedDigest
+ Mtime int64
+}
+
+// EachKeepService calls f once for every readable
+// KeepService. EachKeepService stops if it encounters an
+// error, such as f returning a non-nil error.
+func (c *Client) EachKeepService(f func(KeepService) error) error {
+ params := ResourceListParams{}
+ for {
+ var page KeepServiceList
+ err := c.RequestAndDecode(&page, "GET", "arvados/v1/keep_services", nil, params)
+ if err != nil {
+ return err
+ }
+ for _, item := range page.Items {
+ err = f(item)
+ if err != nil {
+ return err
+ }
+ }
+ params.Offset = params.Offset + len(page.Items)
+ if params.Offset >= page.ItemsAvailable {
+ return nil
+ }
+ }
+}
+
+func (s *KeepService) url(path string) string {
+ var f string
+ if s.ServiceSSLFlag {
+ f = "https://%s:%d/%s"
+ } else {
+ f = "http://%s:%d/%s"
+ }
+ return fmt.Sprintf(f, s.ServiceHost, s.ServicePort, path)
+}
+
+// String implements fmt.Stringer
+func (s *KeepService) String() string {
+ return s.UUID
+}
+
+// Index returns an unsorted list of blocks that can be retrieved from
+// this server.
+func (s *KeepService) Index(c *Client, prefix string) ([]KeepServiceIndexEntry, error) {
+ url := s.url("index/" + prefix)
+ req, err := http.NewRequest("GET", url, nil)
+ if err != nil {
+ return nil, fmt.Errorf("NewRequest(%v): %v", url, err)
+ }
+ resp, err := c.Do(req)
+ if err != nil {
+ return nil, fmt.Errorf("Do(%v): %v", url, err)
+ } else if resp.StatusCode != 200 {
+ return nil, fmt.Errorf("%v: %v", url, resp.Status)
+ }
+ defer resp.Body.Close()
+
+ var entries []KeepServiceIndexEntry
+ scanner := bufio.NewScanner(resp.Body)
+ sawEOF := false
+ for scanner.Scan() {
+ if sawEOF {
+ return nil, fmt.Errorf("Index response contained non-terminal blank line")
+ }
+ line := scanner.Text()
+ if line == "" {
+ sawEOF = true
+ continue
+ }
+ fields := strings.Split(line, " ")
+ if len(fields) != 2 {
+ return nil, fmt.Errorf("Malformed index line %q: %d fields", line, len(fields))
+ }
+ mtime, err := strconv.ParseInt(fields[1], 10, 64)
+ if err != nil {
+ return nil, fmt.Errorf("Malformed index line %q: mtime: %v", line, err)
+ }
+ entries = append(entries, KeepServiceIndexEntry{
+ SizedDigest: SizedDigest(fields[0]),
+ Mtime: mtime,
+ })
+ }
+ if err := scanner.Err(); err != nil {
+ return nil, fmt.Errorf("Error scanning index response: %v", err)
+ }
+ if !sawEOF {
+ return nil, fmt.Errorf("Index response had no EOF marker")
+ }
+ return entries, nil
+}
diff --git a/sdk/go/arvados/resource_list.go b/sdk/go/arvados/resource_list.go
new file mode 100644
index 0000000..e9ea268
--- /dev/null
+++ b/sdk/go/arvados/resource_list.go
@@ -0,0 +1,25 @@
+package arvados
+
+import "encoding/json"
+
+// ResourceListParams expresses which results are requested in a
+// list/index API.
+type ResourceListParams struct {
+ Select []string `json:"select,omitempty"`
+ Filters []Filter `json:"filters,omitempty"`
+ Limit *int `json:"limit,omitempty"`
+ Offset int `json:"offset,omitempty"`
+ Order string `json:"order,omitempty"`
+}
+
+// A Filter restricts the set of records returned by a list/index API.
+type Filter struct {
+ Attr string
+ Operator string
+ Operand interface{}
+}
+
+// MarshalJSON encodes a Filter in the form expected by the API.
+func (f *Filter) MarshalJSON() ([]byte, error) {
+ return json.Marshal([]interface{}{f.Attr, f.Operator, f.Operand})
+}
diff --git a/sdk/go/arvados/resource_list_test.go b/sdk/go/arvados/resource_list_test.go
new file mode 100644
index 0000000..b5e6e7d
--- /dev/null
+++ b/sdk/go/arvados/resource_list_test.go
@@ -0,0 +1,21 @@
+package arvados
+
+import (
+ "bytes"
+ "encoding/json"
+ "testing"
+ "time"
+)
+
+func TestMarshalFiltersWithNanoseconds(t *testing.T) {
+ t0 := time.Now()
+ t0str := t0.Format(time.RFC3339Nano)
+ buf, err := json.Marshal([]Filter{
+ {Attr: "modified_at", Operator: "=", Operand: t0}})
+ if err != nil {
+ t.Fatal(err)
+ }
+ if expect := []byte(`[["modified_at","=","` + t0str + `"]]`); 0 != bytes.Compare(buf, expect) {
+ t.Errorf("Encoded as %q, expected %q", buf, expect)
+ }
+}
diff --git a/sdk/go/arvados/user.go b/sdk/go/arvados/user.go
new file mode 100644
index 0000000..684a3af
--- /dev/null
+++ b/sdk/go/arvados/user.go
@@ -0,0 +1,17 @@
+package arvados
+
+// User is an arvados#user record
+type User struct {
+ UUID string `json:"uuid,omitempty"`
+ IsActive bool `json:"is_active"`
+ IsAdmin bool `json:"is_admin"`
+ Username string `json:"username,omitempty"`
+}
+
+// CurrentUser calls arvados.v1.users.current, and returns the User
+// record corresponding to this client's credentials.
+func (c *Client) CurrentUser() (User, error) {
+ var u User
+ err := c.RequestAndDecode(&u, "GET", "arvados/v1/users/current", nil, nil)
+ return u, err
+}
diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go
new file mode 100644
index 0000000..7471aa6
--- /dev/null
+++ b/services/keep-balance/balance.go
@@ -0,0 +1,611 @@
+package main
+
+import (
+ "fmt"
+ "log"
+ "os"
+ "runtime"
+ "strings"
+ "sync"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+)
+
+// CheckConfig returns an error if anything is wrong with the given
+// config and runOptions.
+func CheckConfig(config Config, runOptions RunOptions) error {
+ if len(config.KeepServiceList.Items) > 0 && config.KeepServiceTypes != nil {
+ return fmt.Errorf("cannot specify both KeepServiceList and KeepServiceTypes in config")
+ }
+ if !runOptions.Once && config.RunPeriod == arvados.Duration(0) {
+ return fmt.Errorf("you must either use the -once flag, or specify RunPeriod in config")
+ }
+ return nil
+}
+
+// Balancer compares the contents of keepstore servers with the
+// collections stored in Arvados, and issues pull/trash requests
+// needed to get (closer to) the optimal data layout.
+//
+// In the optimal data layout: every data block referenced by a
+// collection is replicated at least as many times as desired by the
+// collection; there are no unreferenced data blocks older than
+// BlobSignatureTTL; and all N existing replicas of a given data block
+// are in the N best positions in rendezvous probe order.
+type Balancer struct {
+ *BlockStateMap
+ KeepServices map[string]*KeepService
+ DefaultReplication int
+ Logger *log.Logger
+ Dumper *log.Logger
+ MinMtime int64
+
+ collScanned int
+ serviceRoots map[string]string
+ errors []error
+ mutex sync.Mutex
+}
+
+// Run performs a balance operation using the given config and
+// runOptions. It should only be called once on a given Balancer
+// object. Typical usage:
+//
+// err = (&Balancer{}).Run(config, runOptions)
+func (bal *Balancer) Run(config Config, runOptions RunOptions) (err error) {
+ bal.Dumper = runOptions.Dumper
+ bal.Logger = runOptions.Logger
+ if bal.Logger == nil {
+ bal.Logger = log.New(os.Stderr, "", log.LstdFlags)
+ }
+
+ defer timeMe(bal.Logger, "Run")()
+
+ if len(config.KeepServiceList.Items) > 0 {
+ err = bal.SetKeepServices(config.KeepServiceList)
+ } else {
+ err = bal.DiscoverKeepServices(&config.Client, config.KeepServiceTypes)
+ }
+ if err != nil {
+ return
+ }
+
+ if err = bal.CheckSanityEarly(&config.Client); err != nil {
+ return
+ }
+ if runOptions.CommitTrash {
+ if err = bal.ClearTrashLists(&config.Client); err != nil {
+ return
+ }
+ }
+ if err = bal.GetCurrentState(&config.Client); err != nil {
+ return
+ }
+ bal.ComputeChangeSets()
+ bal.PrintStatistics()
+ if err = bal.CheckSanityLate(); err != nil {
+ return
+ }
+ if runOptions.CommitPulls {
+ err = bal.CommitPulls(&config.Client)
+ if err != nil {
+ // Skip trash if we can't pull. (Too cautious?)
+ return
+ }
+ }
+ if runOptions.CommitTrash {
+ err = bal.CommitTrash(&config.Client)
+ }
+ return
+}
+
+// SetKeepServices sets the list of KeepServices to operate on.
+func (bal *Balancer) SetKeepServices(srvList arvados.KeepServiceList) error {
+ bal.KeepServices = make(map[string]*KeepService)
+ for _, srv := range srvList.Items {
+ bal.KeepServices[srv.UUID] = &KeepService{
+ KeepService: srv,
+ ChangeSet: &ChangeSet{},
+ }
+ }
+ return nil
+}
+
+// DiscoverKeepServices sets the list of KeepServices by calling the
+// API to get a list of all services, and selecting the ones whose
+// ServiceType is in okTypes.
+func (bal *Balancer) DiscoverKeepServices(c *arvados.Client, okTypes []string) error {
+ bal.KeepServices = make(map[string]*KeepService)
+ ok := make(map[string]bool)
+ for _, t := range okTypes {
+ ok[t] = true
+ }
+ return c.EachKeepService(func(srv arvados.KeepService) error {
+ if ok[srv.ServiceType] {
+ bal.KeepServices[srv.UUID] = &KeepService{
+ KeepService: srv,
+ ChangeSet: &ChangeSet{},
+ }
+ } else {
+ bal.logf("skipping %v with service type %q", srv.UUID, srv.ServiceType)
+ }
+ return nil
+ })
+}
+
+// CheckSanityEarly checks for configuration and runtime errors that
+// can be detected before GetCurrentState() and ComputeChangeSets()
+// are called.
+//
+// If it returns an error, it is pointless to run GetCurrentState or
+// ComputeChangeSets: after doing so, the statistics would be
+// meaningless and it would be dangerous to run any Commit methods.
+func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
+ u, err := c.CurrentUser()
+ if err != nil {
+ return fmt.Errorf("CurrentUser(): %v", err)
+ }
+ if !u.IsActive || !u.IsAdmin {
+ return fmt.Errorf("current user (%s) is not an active admin user", u.UUID)
+ }
+ for _, srv := range bal.KeepServices {
+ if srv.ServiceType == "proxy" {
+ return fmt.Errorf("config error: %s: proxy servers cannot be balanced", srv)
+ }
+ }
+ return nil
+}
+
+// ClearTrashLists sends an empty trash list to each keep
+// service. Calling this before GetCurrentState avoids races.
+//
+// When a block appears in an index, we assume that replica will still
+// exist after we delete other replicas on other servers. However,
+// it's possible that a previous rebalancing operation made different
+// decisions (e.g., servers were added/removed, and rendezvous order
+// changed). In this case, the replica might already be on that
+// server's trash list, and it might be deleted before we send a
+// replacement trash list.
+//
+// We avoid this problem if we clear all trash lists before getting
+// indexes. (We also assume there is only one rebalancing process
+// running at a time.)
+func (bal *Balancer) ClearTrashLists(c *arvados.Client) error {
+ for _, srv := range bal.KeepServices {
+ srv.ChangeSet = &ChangeSet{}
+ }
+ return bal.CommitTrash(c)
+}
+
+// GetCurrentState determines the current replication state, and the
+// desired replication level, for every block that is either
+// retrievable or referenced.
+//
+// It determines the current replication state by reading the block index
+// from every known Keep service.
+//
+// It determines the desired replication level by retrieving all
+// collection manifests in the database (API server).
+//
+// It encodes the resulting information in BlockStateMap.
+func (bal *Balancer) GetCurrentState(c *arvados.Client) error {
+ defer timeMe(bal.Logger, "GetCurrentState")()
+ bal.BlockStateMap = NewBlockStateMap()
+
+ dd, err := c.DiscoveryDocument()
+ if err != nil {
+ return err
+ }
+ bal.DefaultReplication = dd.DefaultCollectionReplication
+ bal.MinMtime = time.Now().Unix() - dd.BlobSignatureTTL
+
+ errs := make(chan error, 2+len(bal.KeepServices))
+ wg := sync.WaitGroup{}
+
+ // Start one goroutine for each KeepService: retrieve the
+ // index, and add the returned blocks to BlockStateMap.
+ for _, srv := range bal.KeepServices {
+ wg.Add(1)
+ go func(srv *KeepService) {
+ defer wg.Done()
+ bal.logf("%s: retrieve index", srv)
+ idx, err := srv.Index(c, "")
+ if err != nil {
+ errs <- fmt.Errorf("%s: %v", srv, err)
+ return
+ }
+ bal.logf("%s: add %d replicas to map", srv, len(idx))
+ bal.BlockStateMap.AddReplicas(srv, idx)
+ bal.logf("%s: done", srv)
+ }(srv)
+ }
+
+ // collQ buffers incoming collections so we can start fetching
+ // the next page without waiting for the current page to
+ // finish processing. (1000 happens to match the page size
+ // used by (*arvados.Client)EachCollection(), but it's OK if
+ // they don't match.)
+ collQ := make(chan arvados.Collection, 1000)
+
+ // Start a goroutine to process collections. (We could use a
+ // worker pool here, but even with a single worker we already
+ // process collections much faster than we can retrieve them.)
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for coll := range collQ {
+ err := bal.addCollection(coll)
+ if err != nil {
+ errs <- err
+ for range collQ {
+ }
+ return
+ }
+ bal.collScanned++
+ }
+ }()
+
+ // Start a goroutine to retrieve all collections from the
+ // Arvados database and send them to collQ for processing.
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ err = EachCollection(c,
+ func(coll arvados.Collection) error {
+ collQ <- coll
+ if len(errs) > 0 {
+ // some other GetCurrentState
+ // error happened: no point
+ // getting any more
+ // collections.
+ return fmt.Errorf("")
+ }
+ return nil
+ }, func(done, total int) {
+ bal.logf("collections: %d/%d", done, total)
+ })
+ close(collQ)
+ if err != nil {
+ errs <- err
+ }
+ }()
+
+ go func() {
+ // Send a nil error when all goroutines finish. If
+ // this is the first error sent to errs, then
+ // everything worked.
+ wg.Wait()
+ errs <- nil
+ }()
+ return <-errs
+}
+
+func (bal *Balancer) addCollection(coll arvados.Collection) error {
+ blkids, err := coll.SizedDigests()
+ if err != nil {
+ bal.mutex.Lock()
+ bal.errors = append(bal.errors, fmt.Errorf("%v: %v", coll.UUID, err))
+ bal.mutex.Unlock()
+ return nil
+ }
+ repl := bal.DefaultReplication
+ if coll.ReplicationDesired != nil {
+ repl = *coll.ReplicationDesired
+ }
+ debugf("%v: %d block x%d", coll.UUID, len(blkids), repl)
+ bal.BlockStateMap.IncreaseDesired(repl, blkids)
+ return nil
+}
+
+// ComputeChangeSets compares, for each known block, the current and
+// desired replication states. If it is possible to get closer to the
+// desired state by copying or deleting blocks, it adds those changes
+// to the relevant KeepServices' ChangeSets.
+//
+// It does not actually apply any of the computed changes.
+func (bal *Balancer) ComputeChangeSets() {
+ // This just calls balanceBlock() once for each block, using a
+ // pool of worker goroutines.
+ defer timeMe(bal.Logger, "ComputeChangeSets")()
+ bal.setupServiceRoots()
+
+ type balanceTask struct {
+ blkid arvados.SizedDigest
+ blk *BlockState
+ }
+ nWorkers := 1 + runtime.NumCPU()
+ todo := make(chan balanceTask, nWorkers)
+ var wg sync.WaitGroup
+ for i := 0; i < nWorkers; i++ {
+ wg.Add(1)
+ go func() {
+ for work := range todo {
+ bal.balanceBlock(work.blkid, work.blk)
+ }
+ wg.Done()
+ }()
+ }
+ bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
+ todo <- balanceTask{
+ blkid: blkid,
+ blk: blk,
+ }
+ })
+ close(todo)
+ wg.Wait()
+}
+
+func (bal *Balancer) setupServiceRoots() {
+ bal.serviceRoots = make(map[string]string)
+ for _, srv := range bal.KeepServices {
+ bal.serviceRoots[srv.UUID] = srv.UUID
+ }
+}
+
+const (
+ changeStay = iota
+ changePull
+ changeTrash
+ changeNone
+)
+
+var changeName = map[int]string{
+ changeStay: "stay",
+ changePull: "pull",
+ changeTrash: "trash",
+ changeNone: "none",
+}
+
+// balanceBlock compares current state to desired state for a single
+// block, and makes the appropriate ChangeSet calls.
+func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) {
+ debugf("balanceBlock: %v %+v", blkid, blk)
+ uuids := keepclient.NewRootSorter(bal.serviceRoots, string(blkid[:32])).GetSortedRoots()
+ hasRepl := make(map[string]Replica, len(bal.serviceRoots))
+ for _, repl := range blk.Replicas {
+ hasRepl[repl.UUID] = repl
+ // TODO: when multiple copies are on one server, use
+ // the oldest one that doesn't have a timestamp
+ // collision with other replicas.
+ }
+ // number of replicas already found in positions better than
+ // the position we're contemplating now.
+ reportedBestRepl := 0
+ // To be safe we assume two replicas with the same Mtime are
+ // in fact the same replica being reported more than
+ // once. len(uniqueBestRepl) is the number of distinct
+ // replicas in the best rendezvous positions we've considered
+ // so far.
+ uniqueBestRepl := make(map[int64]bool, len(bal.serviceRoots))
+ // pulls is the number of Pull changes we have already
+ // requested. (For purposes of deciding whether to Pull to
+ // rendezvous position N, we should assume all pulls we have
+ // requested on rendezvous positions M<N will be successful.)
+ pulls := 0
+ var changes []string
+ for _, uuid := range uuids {
+ change := changeNone
+ srv := bal.KeepServices[uuid]
+ // TODO: request a Touch if Mtime is duplicated.
+ repl, ok := hasRepl[srv.UUID]
+ if ok {
+ // This service has a replica. We should
+ // delete it if [1] we already have enough
+ // distinct replicas in better rendezvous
+ // positions and [2] this replica's Mtime is
+ // distinct from all of the better replicas'
+ // Mtimes.
+ if !srv.ReadOnly &&
+ repl.Mtime < bal.MinMtime &&
+ len(uniqueBestRepl) >= blk.Desired &&
+ !uniqueBestRepl[repl.Mtime] {
+ srv.AddTrash(Trash{
+ SizedDigest: blkid,
+ Mtime: repl.Mtime,
+ })
+ change = changeTrash
+ } else {
+ change = changeStay
+ }
+ uniqueBestRepl[repl.Mtime] = true
+ reportedBestRepl++
+ } else if pulls+reportedBestRepl < blk.Desired &&
+ len(blk.Replicas) > 0 &&
+ !srv.ReadOnly {
+ // This service doesn't have a replica. We
+ // should pull one to this server if we don't
+ // already have enough (existing+requested)
+ // replicas in better rendezvous positions.
+ srv.AddPull(Pull{
+ SizedDigest: blkid,
+ Source: blk.Replicas[0].KeepService,
+ })
+ pulls++
+ change = changePull
+ }
+ if bal.Dumper != nil {
+ changes = append(changes, fmt.Sprintf("%s:%d=%s,%d", srv.ServiceHost, srv.ServicePort, changeName[change], repl.Mtime))
+ }
+ }
+ if bal.Dumper != nil {
+ bal.Dumper.Printf("%s have=%d want=%d %s", blkid, len(blk.Replicas), blk.Desired, strings.Join(changes, " "))
+ }
+}
+
+type blocksNBytes struct {
+ replicas int
+ blocks int
+ bytes int64
+}
+
+func (bb blocksNBytes) String() string {
+ return fmt.Sprintf("%d replicas (%d blocks, %d bytes)", bb.replicas, bb.blocks, bb.bytes)
+}
+
+type balancerStats struct {
+ lost, overrep, unref, garbage, underrep, justright blocksNBytes
+ desired, current blocksNBytes
+ pulls, trashes int
+}
+
+func (bal *Balancer) getStatistics() (s balancerStats) {
+ bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
+ surplus := len(blk.Replicas) - blk.Desired
+ bytes := blkid.Size()
+ switch {
+ case len(blk.Replicas) == 0 && blk.Desired > 0:
+ s.lost.replicas -= surplus
+ s.lost.blocks++
+ s.lost.bytes += bytes * int64(-surplus)
+ case len(blk.Replicas) < blk.Desired:
+ s.underrep.replicas -= surplus
+ s.underrep.blocks++
+ s.underrep.bytes += bytes * int64(-surplus)
+ case len(blk.Replicas) > 0 && blk.Desired == 0:
+ counter := &s.garbage
+ for _, r := range blk.Replicas {
+ if r.Mtime >= bal.MinMtime {
+ counter = &s.unref
+ break
+ }
+ }
+ counter.replicas += surplus
+ counter.blocks++
+ counter.bytes += bytes * int64(surplus)
+ case len(blk.Replicas) > blk.Desired:
+ s.overrep.replicas += surplus
+ s.overrep.blocks++
+ s.overrep.bytes += bytes * int64(len(blk.Replicas)-blk.Desired)
+ default:
+ s.justright.replicas += blk.Desired
+ s.justright.blocks++
+ s.justright.bytes += bytes * int64(blk.Desired)
+ }
+
+ if blk.Desired > 0 {
+ s.desired.replicas += blk.Desired
+ s.desired.blocks++
+ s.desired.bytes += bytes * int64(blk.Desired)
+ }
+ if len(blk.Replicas) > 0 {
+ s.current.replicas += len(blk.Replicas)
+ s.current.blocks++
+ s.current.bytes += bytes * int64(len(blk.Replicas))
+ }
+ })
+ for _, srv := range bal.KeepServices {
+ s.pulls += len(srv.ChangeSet.Pulls)
+ s.trashes += len(srv.ChangeSet.Trashes)
+ }
+ return
+}
+
+// PrintStatistics writes statistics about the computed changes to
+// bal.Logger. It should not be called until ComputeChangeSets has
+// finished.
+func (bal *Balancer) PrintStatistics() {
+ s := bal.getStatistics()
+ bal.logf("===")
+ bal.logf("%s lost (0=have<want)", s.lost)
+ bal.logf("%s underreplicated (0<have<want)", s.underrep)
+ bal.logf("%s just right (have=want)", s.justright)
+ bal.logf("%s overreplicated (have>want>0)", s.overrep)
+ bal.logf("%s unreferenced (have>want=0, new)", s.unref)
+ bal.logf("%s garbage (have>want=0, old)", s.garbage)
+ bal.logf("===")
+ bal.logf("%s total commitment (excluding unreferenced)", s.desired)
+ bal.logf("%s total usage", s.current)
+ bal.logf("===")
+ for _, srv := range bal.KeepServices {
+ bal.logf("%s: %v\n", srv, srv.ChangeSet)
+ }
+ bal.logf("===")
+}
+
+// CheckSanityLate checks for configuration and runtime errors after
+// GetCurrentState() and ComputeChangeSets() have finished.
+//
+// If it returns an error, it is dangerous to run any Commit methods.
+func (bal *Balancer) CheckSanityLate() error {
+ if bal.errors != nil {
+ for _, err := range bal.errors {
+ bal.logf("deferred error: %v", err)
+ }
+ return fmt.Errorf("cannot proceed safely after deferred errors")
+ }
+
+ if bal.collScanned == 0 {
+ return fmt.Errorf("received zero collections")
+ }
+
+ anyDesired := false
+ bal.BlockStateMap.Apply(func(_ arvados.SizedDigest, blk *BlockState) {
+ if blk.Desired > 0 {
+ anyDesired = true
+ }
+ })
+ if !anyDesired {
+ return fmt.Errorf("zero blocks have desired replication>0")
+ }
+
+ if dr := bal.DefaultReplication; dr < 1 {
+ return fmt.Errorf("Default replication (%d) is less than 1", dr)
+ }
+
+ // TODO: no two services have identical indexes
+ // TODO: no collisions (same md5, different size)
+ return nil
+}
+
+// CommitPulls sends the computed lists of pull requests to the
+// keepstore servers. This has the effect of increasing replication of
+// existing blocks that are either underreplicated or poorly
+// distributed according to rendezvous hashing.
+func (bal *Balancer) CommitPulls(c *arvados.Client) error {
+ return bal.commitAsync(c, "send pull list",
+ func(srv *KeepService) error {
+ return srv.CommitPulls(c)
+ })
+}
+
+// CommitTrash sends the computed lists of trash requests to the
+// keepstore servers. This has the effect of deleting blocks that are
+// overreplicated or unreferenced.
+func (bal *Balancer) CommitTrash(c *arvados.Client) error {
+ return bal.commitAsync(c, "send trash list",
+ func(srv *KeepService) error {
+ return srv.CommitTrash(c)
+ })
+}
+
+func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *KeepService) error) error {
+ errs := make(chan error)
+ for _, srv := range bal.KeepServices {
+ go func(srv *KeepService) {
+ var err error
+ defer func() { errs <- err }()
+ label := fmt.Sprintf("%s: %v", srv, label)
+ defer timeMe(bal.Logger, label)()
+ err = f(srv)
+ if err != nil {
+ err = fmt.Errorf("%s: %v", label, err)
+ }
+ }(srv)
+ }
+ var lastErr error
+ for _ = range bal.KeepServices {
+ if err := <-errs; err != nil {
+ bal.logf("%v", err)
+ lastErr = err
+ }
+ }
+ close(errs)
+ return lastErr
+}
+
+func (bal *Balancer) logf(f string, args ...interface{}) {
+ if bal.Logger != nil {
+ bal.Logger.Printf(f, args...)
+ }
+}
diff --git a/services/keep-balance/balance_run_test.go b/services/keep-balance/balance_run_test.go
new file mode 100644
index 0000000..a138d91
--- /dev/null
+++ b/services/keep-balance/balance_run_test.go
@@ -0,0 +1,374 @@
+package main
+
+import (
+ _ "encoding/json"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "log"
+ "net/http"
+ "net/http/httptest"
+ "strings"
+ "sync"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+
+ check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&runSuite{})
+
+type reqTracker struct {
+ reqs []http.Request
+ sync.Mutex
+}
+
+func (rt *reqTracker) Count() int {
+ rt.Lock()
+ defer rt.Unlock()
+ return len(rt.reqs)
+}
+
+func (rt *reqTracker) Add(req *http.Request) int {
+ rt.Lock()
+ defer rt.Unlock()
+ rt.reqs = append(rt.reqs, *req)
+ return len(rt.reqs)
+}
+
+// stubServer is an HTTP transport that intercepts and processes all
+// requests using its own handlers.
+type stubServer struct {
+ mux *http.ServeMux
+ srv *httptest.Server
+ mutex sync.Mutex
+ Requests reqTracker
+ logf func(string, ...interface{})
+}
+
+// Start initializes the stub server and returns an *http.Client that
+// uses the stub server to handle all requests.
+//
+// A stubServer that has been started should eventually be shut down
+// with Close().
+func (s *stubServer) Start() *http.Client {
+ // Set up a config.Client that forwards all requests to s.mux
+ // via s.srv. Test cases will attach handlers to s.mux to get
+ // the desired responses.
+ s.mux = http.NewServeMux()
+ s.srv = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ s.mutex.Lock()
+ s.Requests.Add(r)
+ s.mutex.Unlock()
+ w.Header().Set("Content-Type", "application/json")
+ s.mux.ServeHTTP(w, r)
+ }))
+ return &http.Client{Transport: s}
+}
+
+func (s *stubServer) RoundTrip(req *http.Request) (*http.Response, error) {
+ w := httptest.NewRecorder()
+ s.mux.ServeHTTP(w, req)
+ return &http.Response{
+ StatusCode: w.Code,
+ Status: fmt.Sprintf("%d %s", w.Code, http.StatusText(w.Code)),
+ Header: w.HeaderMap,
+ Body: ioutil.NopCloser(w.Body)}, nil
+}
+
+// Close releases resources used by the server.
+func (s *stubServer) Close() {
+ s.srv.Close()
+}
+
+func (s *stubServer) serveStatic(path, data string) *reqTracker {
+ rt := &reqTracker{}
+ s.mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
+ rt.Add(r)
+ if r.Body != nil {
+ ioutil.ReadAll(r.Body)
+ r.Body.Close()
+ }
+ io.WriteString(w, data)
+ })
+ return rt
+}
+
+func (s *stubServer) serveCurrentUserAdmin() *reqTracker {
+ return s.serveStatic("/arvados/v1/users/current",
+ `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":true,"is_active":true}`)
+}
+
+func (s *stubServer) serveCurrentUserNotAdmin() *reqTracker {
+ return s.serveStatic("/arvados/v1/users/current",
+ `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":false,"is_active":true}`)
+}
+
+func (s *stubServer) serveDiscoveryDoc() *reqTracker {
+ return s.serveStatic("/discovery/v1/apis/arvados/v1/rest",
+ `{"defaultCollectionReplication":2}`)
+}
+
+func (s *stubServer) serveZeroCollections() *reqTracker {
+ return s.serveStatic("/arvados/v1/collections",
+ `{"items":[],"items_available":0}`)
+}
+
+func (s *stubServer) serveFooBarFileCollections() *reqTracker {
+ rt := &reqTracker{}
+ s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
+ r.ParseForm()
+ rt.Add(r)
+ if strings.Contains(r.Form.Get("filters"), `modified_at`) {
+ io.WriteString(w, `{"items_available":0,"items":[]}`)
+ } else {
+ io.WriteString(w, `{"items_available":2,"items":[
+ {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
+ {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
+ }
+ })
+ return rt
+}
+
+func (s *stubServer) serveCollectionsButSkipOne() *reqTracker {
+ rt := &reqTracker{}
+ s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
+ r.ParseForm()
+ rt.Add(r)
+ if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003c="`) {
+ io.WriteString(w, `{"items_available":3,"items":[]}`)
+ } else if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003e="`) {
+ io.WriteString(w, `{"items_available":0,"items":[]}`)
+ } else {
+ io.WriteString(w, `{"items_available":2,"items":[
+ {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
+ {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
+ }
+ })
+ return rt
+}
+
+func (s *stubServer) serveZeroKeepServices() *reqTracker {
+ return s.serveStatic("/arvados/v1/keep_services",
+ `{"items":[],"items_available":0}`)
+}
+
+func (s *stubServer) serveFourDiskKeepServices() *reqTracker {
+ return s.serveStatic("/arvados/v1/keep_services", `{"items_available":5,"items":[
+ {"uuid":"zzzzz-bi6l4-000000000000000","service_host":"keep0.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
+ {"uuid":"zzzzz-bi6l4-000000000000001","service_host":"keep1.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
+ {"uuid":"zzzzz-bi6l4-000000000000002","service_host":"keep2.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
+ {"uuid":"zzzzz-bi6l4-000000000000003","service_host":"keep3.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
+ {"uuid":"zzzzz-bi6l4-h0a0xwut9qa6g3a","service_host":"keep.zzzzz.arvadosapi.com","service_port":25333,"service_ssl_flag":true,"service_type":"proxy"}]}`)
+}
+
+func (s *stubServer) serveKeepstoreIndexFoo4Bar1() *reqTracker {
+ rt := &reqTracker{}
+ s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
+ count := rt.Add(r)
+ if r.Host == "keep0.zzzzz.arvadosapi.com:25107" {
+ io.WriteString(w, "37b51d194a7513e45b56f6524f2d51f2+3 12345678\n")
+ }
+ fmt.Fprintf(w, "acbd18db4cc2f85cedef654fccc4a4d8+3 %d\n\n", 12345678+count)
+ })
+ return rt
+}
+
+func (s *stubServer) serveKeepstoreTrash() *reqTracker {
+ return s.serveStatic("/trash", `{}`)
+}
+
+func (s *stubServer) serveKeepstorePull() *reqTracker {
+ return s.serveStatic("/pull", `{}`)
+}
+
+type runSuite struct {
+ stub stubServer
+ config Config
+}
+
+// make a log.Logger that writes to the current test's c.Log().
+func (s *runSuite) logger(c *check.C) *log.Logger {
+ r, w := io.Pipe()
+ go func() {
+ buf := make([]byte, 10000)
+ for {
+ n, err := r.Read(buf)
+ if n > 0 {
+ if buf[n-1] == '\n' {
+ n--
+ }
+ c.Log(string(buf[:n]))
+ }
+ if err != nil {
+ break
+ }
+ }
+ }()
+ return log.New(w, "", log.LstdFlags)
+}
+
+func (s *runSuite) SetUpTest(c *check.C) {
+ s.config = Config{
+ Client: arvados.Client{
+ AuthToken: "xyzzy",
+ APIHost: "zzzzz.arvadosapi.com",
+ Client: s.stub.Start()},
+ KeepServiceTypes: []string{"disk"}}
+ s.stub.serveDiscoveryDoc()
+ s.stub.logf = c.Logf
+}
+
+func (s *runSuite) TearDownTest(c *check.C) {
+ s.stub.Close()
+}
+
+func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
+ opts := RunOptions{
+ CommitPulls: true,
+ CommitTrash: true,
+ Logger: s.logger(c),
+ }
+ s.stub.serveCurrentUserAdmin()
+ s.stub.serveZeroCollections()
+ s.stub.serveFourDiskKeepServices()
+ s.stub.serveKeepstoreIndexFoo4Bar1()
+ trashReqs := s.stub.serveKeepstoreTrash()
+ pullReqs := s.stub.serveKeepstorePull()
+ err := (&Balancer{}).Run(s.config, opts)
+ c.Check(err, check.ErrorMatches, "received zero collections")
+ c.Check(trashReqs.Count(), check.Equals, 4)
+ c.Check(pullReqs.Count(), check.Equals, 0)
+}
+
+func (s *runSuite) TestServiceTypes(c *check.C) {
+ opts := RunOptions{
+ CommitPulls: true,
+ CommitTrash: true,
+ Logger: s.logger(c),
+ }
+ s.config.KeepServiceTypes = []string{"unlisted-type"}
+ s.stub.serveCurrentUserAdmin()
+ s.stub.serveFooBarFileCollections()
+ s.stub.serveFourDiskKeepServices()
+ indexReqs := s.stub.serveKeepstoreIndexFoo4Bar1()
+ trashReqs := s.stub.serveKeepstoreTrash()
+ err := (&Balancer{}).Run(s.config, opts)
+ c.Check(err, check.IsNil)
+ c.Check(indexReqs.Count(), check.Equals, 0)
+ c.Check(trashReqs.Count(), check.Equals, 0)
+}
+
+func (s *runSuite) TestRefuseNonAdmin(c *check.C) {
+ opts := RunOptions{
+ CommitPulls: true,
+ CommitTrash: true,
+ Logger: s.logger(c),
+ }
+ s.stub.serveCurrentUserNotAdmin()
+ s.stub.serveZeroCollections()
+ s.stub.serveFourDiskKeepServices()
+ trashReqs := s.stub.serveKeepstoreTrash()
+ pullReqs := s.stub.serveKeepstorePull()
+ err := (&Balancer{}).Run(s.config, opts)
+ c.Check(err, check.ErrorMatches, "current user .* is not .* admin user")
+ c.Check(trashReqs.Count(), check.Equals, 0)
+ c.Check(pullReqs.Count(), check.Equals, 0)
+}
+
+func (s *runSuite) TestDetectSkippedCollections(c *check.C) {
+ opts := RunOptions{
+ CommitPulls: true,
+ CommitTrash: true,
+ Logger: s.logger(c),
+ }
+ s.stub.serveCurrentUserAdmin()
+ s.stub.serveCollectionsButSkipOne()
+ s.stub.serveFourDiskKeepServices()
+ s.stub.serveKeepstoreIndexFoo4Bar1()
+ trashReqs := s.stub.serveKeepstoreTrash()
+ pullReqs := s.stub.serveKeepstorePull()
+ err := (&Balancer{}).Run(s.config, opts)
+ c.Check(err, check.ErrorMatches, `Retrieved 2 collections with modtime <= .* but server now reports there are 3 collections.*`)
+ c.Check(trashReqs.Count(), check.Equals, 4)
+ c.Check(pullReqs.Count(), check.Equals, 0)
+}
+
+func (s *runSuite) TestDryRun(c *check.C) {
+ opts := RunOptions{
+ CommitPulls: false,
+ CommitTrash: false,
+ Logger: s.logger(c),
+ }
+ s.stub.serveCurrentUserAdmin()
+ s.stub.serveFooBarFileCollections()
+ s.stub.serveFourDiskKeepServices()
+ s.stub.serveKeepstoreIndexFoo4Bar1()
+ trashReqs := s.stub.serveKeepstoreTrash()
+ pullReqs := s.stub.serveKeepstorePull()
+ var bal Balancer
+ err := bal.Run(s.config, opts)
+ c.Check(err, check.IsNil)
+ c.Check(trashReqs.Count(), check.Equals, 0)
+ c.Check(pullReqs.Count(), check.Equals, 0)
+ stats := bal.getStatistics()
+ c.Check(stats.pulls, check.Not(check.Equals), 0)
+ c.Check(stats.underrep.replicas, check.Not(check.Equals), 0)
+ c.Check(stats.overrep.replicas, check.Not(check.Equals), 0)
+}
+
+func (s *runSuite) TestCommit(c *check.C) {
+ opts := RunOptions{
+ CommitPulls: true,
+ CommitTrash: true,
+ Logger: s.logger(c),
+ Dumper: s.logger(c),
+ }
+ s.stub.serveCurrentUserAdmin()
+ s.stub.serveFooBarFileCollections()
+ s.stub.serveFourDiskKeepServices()
+ s.stub.serveKeepstoreIndexFoo4Bar1()
+ trashReqs := s.stub.serveKeepstoreTrash()
+ pullReqs := s.stub.serveKeepstorePull()
+ var bal Balancer
+ err := bal.Run(s.config, opts)
+ c.Check(err, check.IsNil)
+ c.Check(trashReqs.Count(), check.Equals, 8)
+ c.Check(pullReqs.Count(), check.Equals, 4)
+ stats := bal.getStatistics()
+ // "foo" block is overreplicated by 2
+ c.Check(stats.trashes, check.Equals, 2)
+ // "bar" block is underreplicated by 1, and its only copy is
+ // in a poor rendezvous position
+ c.Check(stats.pulls, check.Equals, 2)
+}
+
+func (s *runSuite) TestRunForever(c *check.C) {
+ opts := RunOptions{
+ CommitPulls: true,
+ CommitTrash: true,
+ Logger: s.logger(c),
+ Dumper: s.logger(c),
+ }
+ s.stub.serveCurrentUserAdmin()
+ s.stub.serveFooBarFileCollections()
+ s.stub.serveFourDiskKeepServices()
+ s.stub.serveKeepstoreIndexFoo4Bar1()
+ trashReqs := s.stub.serveKeepstoreTrash()
+ pullReqs := s.stub.serveKeepstorePull()
+
+ stop := make(chan interface{})
+ s.config.RunPeriod = arvados.Duration(time.Millisecond)
+ go RunForever(s.config, opts, stop)
+
+ // Each run should send 4 clear trash lists + 4 pull lists + 4
+ // trash lists. We should complete four runs in much less than
+ // a second.
+ for t0 := time.Now(); pullReqs.Count() < 16 && time.Since(t0) < 10*time.Second; {
+ time.Sleep(time.Millisecond)
+ }
+ stop <- true
+ c.Check(pullReqs.Count() >= 16, check.Equals, true)
+ c.Check(trashReqs.Count(), check.Equals, 2*pullReqs.Count())
+}
diff --git a/services/keep-balance/balance_test.go b/services/keep-balance/balance_test.go
new file mode 100644
index 0000000..682a5fb
--- /dev/null
+++ b/services/keep-balance/balance_test.go
@@ -0,0 +1,255 @@
+package main
+
+import (
+ "crypto/md5"
+ "fmt"
+ "sort"
+ "strconv"
+ "testing"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+
+ check "gopkg.in/check.v1"
+)
+
+// Test with Gocheck
+func Test(t *testing.T) {
+ check.TestingT(t)
+}
+
+var _ = check.Suite(&balancerSuite{})
+
+type balancerSuite struct {
+ Balancer
+ srvs []*KeepService
+ blks map[string]tester
+ knownRendezvous [][]int
+ signatureTTL int64
+}
+
+const (
+ // index into knownRendezvous
+ known0 = 0
+)
+
+type slots []int
+
+type tester struct {
+ known int
+ desired int
+ current slots
+ timestamps []int64
+ shouldPull slots
+ shouldTrash slots
+}
+
+func (bal *balancerSuite) SetUpSuite(c *check.C) {
+ bal.knownRendezvous = nil
+ for _, str := range []string{
+ "3eab2d5fc9681074",
+ "097dba52e648f1c3",
+ "c5b4e023f8a7d691",
+ "9d81c02e76a3bf54",
+ } {
+ var slots []int
+ for _, c := range []byte(str) {
+ pos, _ := strconv.ParseUint(string(c), 16, 4)
+ slots = append(slots, int(pos))
+ }
+ bal.knownRendezvous = append(bal.knownRendezvous, slots)
+ }
+
+ bal.signatureTTL = 3600
+}
+
+func (bal *balancerSuite) SetUpTest(c *check.C) {
+ bal.srvs = make([]*KeepService, 16)
+ bal.KeepServices = make(map[string]*KeepService)
+ for i := range bal.srvs {
+ srv := &KeepService{
+ KeepService: arvados.KeepService{
+ UUID: fmt.Sprintf("zzzzz-bi6l4-%015x", i),
+ },
+ }
+ bal.srvs[i] = srv
+ bal.KeepServices[srv.UUID] = srv
+ }
+
+ bal.MinMtime = time.Now().Unix() - bal.signatureTTL
+}
+
+func (bal *balancerSuite) TestPerfect(c *check.C) {
+ bal.try(c, tester{
+ desired: 2,
+ current: slots{0, 1},
+ shouldPull: nil,
+ shouldTrash: nil})
+}
+
+func (bal *balancerSuite) TestDecreaseRepl(c *check.C) {
+ bal.try(c, tester{
+ desired: 2,
+ current: slots{0, 2, 1},
+ shouldTrash: slots{2}})
+}
+
+func (bal *balancerSuite) TestDecreaseReplToZero(c *check.C) {
+ bal.try(c, tester{
+ desired: 0,
+ current: slots{0, 1, 3},
+ shouldTrash: slots{0, 1, 3}})
+}
+
+func (bal *balancerSuite) TestIncreaseRepl(c *check.C) {
+ bal.try(c, tester{
+ desired: 4,
+ current: slots{0, 1},
+ shouldPull: slots{2, 3}})
+}
+
+func (bal *balancerSuite) TestSkipReadonly(c *check.C) {
+ bal.srvList(0, slots{3})[0].ReadOnly = true
+ bal.try(c, tester{
+ desired: 4,
+ current: slots{0, 1},
+ shouldPull: slots{2, 4}})
+}
+
+func (bal *balancerSuite) TestFixUnbalanced(c *check.C) {
+ bal.try(c, tester{
+ desired: 2,
+ current: slots{2, 0},
+ shouldPull: slots{1}})
+ bal.try(c, tester{
+ desired: 2,
+ current: slots{2, 7},
+ shouldPull: slots{0, 1}})
+ // if only one of the pulls succeeds, we'll see this next:
+ bal.try(c, tester{
+ desired: 2,
+ current: slots{2, 1, 7},
+ shouldPull: slots{0},
+ shouldTrash: slots{7}})
+ // if both pulls succeed, we'll see this next:
+ bal.try(c, tester{
+ desired: 2,
+ current: slots{2, 0, 1, 7},
+ shouldTrash: slots{2, 7}})
+
+ // unbalanced + excessive replication => pull + trash
+ bal.try(c, tester{
+ desired: 2,
+ current: slots{2, 5, 7},
+ shouldPull: slots{0, 1},
+ shouldTrash: slots{7}})
+}
+
+func (bal *balancerSuite) TestIncreaseReplTimestampCollision(c *check.C) {
+ // For purposes of increasing replication, we assume identical
+ // replicas are distinct.
+ bal.try(c, tester{
+ desired: 4,
+ current: slots{0, 1},
+ timestamps: []int64{12345678, 12345678},
+ shouldPull: slots{2, 3}})
+}
+
+func (bal *balancerSuite) TestDecreaseReplTimestampCollision(c *check.C) {
+ // For purposes of decreasing replication, we assume identical
+ // replicas are NOT distinct.
+ bal.try(c, tester{
+ desired: 2,
+ current: slots{0, 1, 2},
+ timestamps: []int64{12345678, 12345678, 12345678}})
+ bal.try(c, tester{
+ desired: 2,
+ current: slots{0, 1, 2},
+ timestamps: []int64{12345678, 10000000, 10000000}})
+}
+
+func (bal *balancerSuite) TestDecreaseReplBlockTooNew(c *check.C) {
+ oldTime := bal.MinMtime - 3600
+ newTime := bal.MinMtime + 3600
+ // The excess replica is too new to delete.
+ bal.try(c, tester{
+ desired: 2,
+ current: slots{0, 1, 2},
+ timestamps: []int64{oldTime, newTime, newTime + 1}})
+ // The best replicas are too new to delete, but the excess
+ // replica is old enough.
+ bal.try(c, tester{
+ desired: 2,
+ current: slots{0, 1, 2},
+ timestamps: []int64{newTime, newTime + 1, oldTime},
+ shouldTrash: slots{2}})
+}
+
+// Clear all servers' changesets, balance a single block, and verify
+// the appropriate changes for that block have been added to the
+// changesets.
+func (bal *balancerSuite) try(c *check.C, t tester) {
+ bal.setupServiceRoots()
+ blk := &BlockState{
+ Desired: t.desired,
+ Replicas: bal.replList(t.known, t.current)}
+ for i, t := range t.timestamps {
+ blk.Replicas[i].Mtime = t
+ }
+ for _, srv := range bal.srvs {
+ srv.ChangeSet = &ChangeSet{}
+ }
+ bal.balanceBlock(knownBlkid(t.known), blk)
+
+ var didPull, didTrash slots
+ for i, srv := range bal.srvs {
+ var slot int
+ for probeOrder, srvNum := range bal.knownRendezvous[t.known] {
+ if srvNum == i {
+ slot = probeOrder
+ }
+ }
+ for _, pull := range srv.Pulls {
+ didPull = append(didPull, slot)
+ c.Check(pull.SizedDigest, check.Equals, knownBlkid(t.known))
+ }
+ for _, trash := range srv.Trashes {
+ didTrash = append(didTrash, slot)
+ c.Check(trash.SizedDigest, check.Equals, knownBlkid(t.known))
+ }
+ }
+
+ for _, list := range []slots{didPull, didTrash, t.shouldPull, t.shouldTrash} {
+ sort.Sort(sort.IntSlice(list))
+ }
+ c.Check(didPull, check.DeepEquals, t.shouldPull)
+ c.Check(didTrash, check.DeepEquals, t.shouldTrash)
+}
+
+// srvList returns the KeepServices, sorted in rendezvous order and
+// then selected by idx. For example, srvList(3, 0, 1, 4) returns the
+// the first-, second-, and fifth-best servers for storing
+// bal.knownBlkid(3).
+func (bal *balancerSuite) srvList(knownBlockID int, order slots) (srvs []*KeepService) {
+ for _, i := range order {
+ srvs = append(srvs, bal.srvs[bal.knownRendezvous[knownBlockID][i]])
+ }
+ return
+}
+
+// replList is like srvList but returns an "existing replicas" slice,
+// suitable for a BlockState test fixture.
+func (bal *balancerSuite) replList(knownBlockID int, order slots) (repls []Replica) {
+ mtime := time.Now().Unix() - bal.signatureTTL - 86400
+ for _, srv := range bal.srvList(knownBlockID, order) {
+ repls = append(repls, Replica{srv, mtime})
+ mtime++
+ }
+ return
+}
+
+// generate the same data hashes that are tested in
+// sdk/go/keepclient/root_sorter_test.go
+func knownBlkid(i int) arvados.SizedDigest {
+ return arvados.SizedDigest(fmt.Sprintf("%x+64", md5.Sum([]byte(fmt.Sprintf("%064x", i)))))
+}
diff --git a/services/keep-balance/block_state.go b/services/keep-balance/block_state.go
new file mode 100644
index 0000000..d607386
--- /dev/null
+++ b/services/keep-balance/block_state.go
@@ -0,0 +1,95 @@
+package main
+
+import (
+ "sync"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+// Replica is a file on disk (or object in an S3 bucket, or blob in an
+// Azure storage container, etc.) as reported in a keepstore index
+// response.
+type Replica struct {
+ *KeepService
+ Mtime int64
+}
+
+// BlockState indicates the number of desired replicas (according to
+// the collections we know about) and the replicas actually stored
+// (according to the keepstore indexes we know about).
+type BlockState struct {
+ Replicas []Replica
+ Desired int
+}
+
+func (bs *BlockState) addReplica(r Replica) {
+ bs.Replicas = append(bs.Replicas, r)
+}
+
+func (bs *BlockState) increaseDesired(n int) {
+ if bs.Desired < n {
+ bs.Desired = n
+ }
+}
+
+// BlockStateMap is a goroutine-safe wrapper around a
+// map[arvados.SizedDigest]*BlockState.
+type BlockStateMap struct {
+ entries map[arvados.SizedDigest]*BlockState
+ mutex sync.Mutex
+}
+
+// NewBlockStateMap returns a newly allocated BlockStateMap.
+func NewBlockStateMap() *BlockStateMap {
+ return &BlockStateMap{
+ entries: make(map[arvados.SizedDigest]*BlockState),
+ }
+}
+
+// return a BlockState entry, allocating a new one if needed. (Private
+// method: not goroutine-safe.)
+func (bsm *BlockStateMap) get(blkid arvados.SizedDigest) *BlockState {
+ // TODO? Allocate BlockState structs a slice at a time,
+ // instead of one at a time.
+ blk := bsm.entries[blkid]
+ if blk == nil {
+ blk = &BlockState{}
+ bsm.entries[blkid] = blk
+ }
+ return blk
+}
+
+// Apply runs f on each entry in the map.
+func (bsm *BlockStateMap) Apply(f func(arvados.SizedDigest, *BlockState)) {
+ bsm.mutex.Lock()
+ defer bsm.mutex.Unlock()
+
+ for blkid, blk := range bsm.entries {
+ f(blkid, blk)
+ }
+}
+
+// AddReplicas updates the map to indicate srv has a replica of each
+// block in idx.
+func (bsm *BlockStateMap) AddReplicas(srv *KeepService, idx []arvados.KeepServiceIndexEntry) {
+ bsm.mutex.Lock()
+ defer bsm.mutex.Unlock()
+
+ for _, ent := range idx {
+ bsm.get(ent.SizedDigest).addReplica(Replica{
+ KeepService: srv,
+ Mtime: ent.Mtime,
+ })
+ }
+}
+
+// IncreaseDesired updates the map to indicate the desired replication
+// for the given blocks is at least n.
+func (bsm *BlockStateMap) IncreaseDesired(n int, blocks []arvados.SizedDigest) {
+ bsm.mutex.Lock()
+ defer bsm.mutex.Unlock()
+
+ for _, blkid := range blocks {
+ bsm.get(blkid).increaseDesired(n)
+ }
+}
diff --git a/services/keep-balance/change_set.go b/services/keep-balance/change_set.go
new file mode 100644
index 0000000..417ea7f
--- /dev/null
+++ b/services/keep-balance/change_set.go
@@ -0,0 +1,75 @@
+package main
+
+import (
+ "encoding/json"
+ "fmt"
+ "sync"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+// Pull is a request to retrieve a block from a remote server, and
+// store it locally.
+type Pull struct {
+ arvados.SizedDigest
+ Source *KeepService
+}
+
+// MarshalJSON formats a pull request the way keepstore wants to see
+// it.
+func (p Pull) MarshalJSON() ([]byte, error) {
+ type KeepstorePullRequest struct {
+ Locator string `json:"locator"`
+ Servers []string `json:"servers"`
+ }
+ return json.Marshal(KeepstorePullRequest{
+ Locator: string(p.SizedDigest[:32]),
+ Servers: []string{p.Source.URLBase()}})
+}
+
+// Trash is a request to delete a block.
+type Trash struct {
+ arvados.SizedDigest
+ Mtime int64
+}
+
+// MarshalJSON formats a trash request the way keepstore wants to see
+// it, i.e., as a bare locator with no +size hint.
+func (t Trash) MarshalJSON() ([]byte, error) {
+ type KeepstoreTrashRequest struct {
+ Locator string `json:"locator"`
+ BlockMtime int64 `json:"block_mtime"`
+ }
+ return json.Marshal(KeepstoreTrashRequest{
+ Locator: string(t.SizedDigest[:32]),
+ BlockMtime: t.Mtime})
+}
+
+// ChangeSet is a set of change requests that will be sent to a
+// keepstore server.
+type ChangeSet struct {
+ Pulls []Pull
+ Trashes []Trash
+ mutex sync.Mutex
+}
+
+// AddPull adds a Pull operation.
+func (cs *ChangeSet) AddPull(p Pull) {
+ cs.mutex.Lock()
+ cs.Pulls = append(cs.Pulls, p)
+ cs.mutex.Unlock()
+}
+
+// AddTrash adds a Trash operation
+func (cs *ChangeSet) AddTrash(t Trash) {
+ cs.mutex.Lock()
+ cs.Trashes = append(cs.Trashes, t)
+ cs.mutex.Unlock()
+}
+
+// String implements fmt.Stringer.
+func (cs *ChangeSet) String() string {
+ cs.mutex.Lock()
+ defer cs.mutex.Unlock()
+ return fmt.Sprintf("ChangeSet{Pulls:%d, Trashes:%d}", len(cs.Pulls), len(cs.Trashes))
+}
diff --git a/services/keep-balance/change_set_test.go b/services/keep-balance/change_set_test.go
new file mode 100644
index 0000000..b5dcb5c
--- /dev/null
+++ b/services/keep-balance/change_set_test.go
@@ -0,0 +1,35 @@
+package main
+
+import (
+ "encoding/json"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+
+ check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&changeSetSuite{})
+
+type changeSetSuite struct{}
+
+func (s *changeSetSuite) TestJSONFormat(c *check.C) {
+ srv := &KeepService{
+ KeepService: arvados.KeepService{
+ UUID: "zzzzz-bi6l4-000000000000001",
+ ServiceType: "disk",
+ ServiceSSLFlag: false,
+ ServiceHost: "keep1.zzzzz.arvadosapi.com",
+ ServicePort: 25107}}
+
+ buf, err := json.Marshal([]Pull{{
+ SizedDigest: arvados.SizedDigest("acbd18db4cc2f85cedef654fccc4a4d8+3"),
+ Source: srv}})
+ c.Check(err, check.IsNil)
+ c.Check(string(buf), check.Equals, `[{"locator":"acbd18db4cc2f85cedef654fccc4a4d8","servers":["http://keep1.zzzzz.arvadosapi.com:25107"]}]`)
+
+ buf, err = json.Marshal([]Trash{{
+ SizedDigest: arvados.SizedDigest("acbd18db4cc2f85cedef654fccc4a4d8+3"),
+ Mtime: 123456789}})
+ c.Check(err, check.IsNil)
+ c.Check(string(buf), check.Equals, `[{"locator":"acbd18db4cc2f85cedef654fccc4a4d8","block_mtime":123456789}]`)
+}
diff --git a/services/keep-balance/collection.go b/services/keep-balance/collection.go
new file mode 100644
index 0000000..e6a1f08
--- /dev/null
+++ b/services/keep-balance/collection.go
@@ -0,0 +1,95 @@
+package main
+
+import (
+ "fmt"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+func countCollections(c *arvados.Client, params arvados.ResourceListParams) (int, error) {
+ var page arvados.CollectionList
+ var zero int
+ params.Limit = &zero
+ err := c.RequestAndDecode(&page, "GET", "arvados/v1/collections", nil, params)
+ return page.ItemsAvailable, err
+}
+
+// EachCollection calls f once for every readable
+// collection. EachCollection stops if it encounters an error, such as
+// f returning a non-nil error.
+//
+// The progress function is called periodically with done (number of
+// times f has been called) and total (number of times f is expected
+// to be called).
+func EachCollection(c *arvados.Client, f func(arvados.Collection) error, progress func(done, total int)) error {
+ if progress == nil {
+ progress = func(_, _ int) {}
+ }
+
+ expectCount, err := countCollections(c, arvados.ResourceListParams{})
+ if err != nil {
+ return err
+ }
+
+ limit := 1000
+ params := arvados.ResourceListParams{
+ Limit: &limit,
+ Order: "modified_at, uuid",
+ Select: []string{"uuid", "manifest_text", "modified_at", "portable_data_hash", "replication_desired"},
+ }
+ var last arvados.Collection
+ var filterTime time.Time
+ callCount := 0
+ for {
+ progress(callCount, expectCount)
+ var page arvados.CollectionList
+ err := c.RequestAndDecode(&page, "GET", "arvados/v1/collections", nil, params)
+ if err != nil {
+ return err
+ }
+ for _, coll := range page.Items {
+ if last.ModifiedAt != nil && *last.ModifiedAt == *coll.ModifiedAt && last.UUID >= coll.UUID {
+ continue
+ }
+ callCount++
+ err = f(coll)
+ if err != nil {
+ return err
+ }
+ last = coll
+ }
+ if last.ModifiedAt == nil || *last.ModifiedAt == filterTime {
+ if page.ItemsAvailable > len(page.Items) {
+ // TODO: use "mtime=X && UUID>Y"
+ // filters to get all collections with
+ // this timestamp, then use "mtime>X"
+ // to get the next timestamp.
+ return fmt.Errorf("BUG: Received an entire page with the same modified_at timestamp (%v), cannot make progress", filterTime)
+ }
+ break
+ }
+ filterTime = *last.ModifiedAt
+ params.Filters = []arvados.Filter{{
+ Attr: "modified_at",
+ Operator: ">=",
+ Operand: filterTime,
+ }, {
+ Attr: "uuid",
+ Operator: "!=",
+ Operand: last.UUID,
+ }}
+ }
+ progress(callCount, expectCount)
+
+ if checkCount, err := countCollections(c, arvados.ResourceListParams{Filters: []arvados.Filter{{
+ Attr: "modified_at",
+ Operator: "<=",
+ Operand: filterTime}}}); err != nil {
+ return err
+ } else if callCount < checkCount {
+ return fmt.Errorf("Retrieved %d collections with modtime <= T=%q, but server now reports there are %d collections with modtime <= T", callCount, filterTime, checkCount)
+ }
+
+ return nil
+}
diff --git a/services/keep-balance/integration_test.go b/services/keep-balance/integration_test.go
new file mode 100644
index 0000000..b090614
--- /dev/null
+++ b/services/keep-balance/integration_test.go
@@ -0,0 +1,92 @@
+package main
+
+import (
+ "bytes"
+ "log"
+ "net/http"
+ "os"
+ "strings"
+ "testing"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+
+ check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&integrationSuite{})
+
+type integrationSuite struct {
+ config Config
+ keepClient *keepclient.KeepClient
+}
+
+func (s *integrationSuite) SetUpSuite(c *check.C) {
+ if testing.Short() {
+ c.Skip("-short")
+ }
+ arvadostest.ResetEnv()
+ arvadostest.StartAPI()
+ arvadostest.StartKeep(4, true)
+
+ arv, err := arvadosclient.MakeArvadosClient()
+ arv.ApiToken = arvadostest.DataManagerToken
+ c.Assert(err, check.IsNil)
+ s.keepClient = &keepclient.KeepClient{
+ Arvados: &arv,
+ Client: &http.Client{},
+ }
+ c.Assert(s.keepClient.DiscoverKeepServers(), check.IsNil)
+ s.putReplicas(c, "foo", 4)
+ s.putReplicas(c, "bar", 1)
+}
+
+func (s *integrationSuite) putReplicas(c *check.C, data string, replicas int) {
+ s.keepClient.Want_replicas = replicas
+ _, _, err := s.keepClient.PutB([]byte(data))
+ c.Assert(err, check.IsNil)
+}
+
+func (s *integrationSuite) TearDownSuite(c *check.C) {
+ if testing.Short() {
+ c.Skip("-short")
+ }
+ arvadostest.StopKeep(4)
+ arvadostest.StopAPI()
+}
+
+func (s *integrationSuite) SetUpTest(c *check.C) {
+ s.config = Config{
+ Client: arvados.Client{
+ APIHost: os.Getenv("ARVADOS_API_HOST"),
+ AuthToken: arvadostest.DataManagerToken,
+ Insecure: true,
+ },
+ KeepServiceTypes: []string{"disk"},
+ }
+}
+
+func (s *integrationSuite) TestBalanceAPIFixtures(c *check.C) {
+ var logBuf *bytes.Buffer
+ for iter := 0; iter < 20; iter++ {
+ logBuf := &bytes.Buffer{}
+ opts := RunOptions{
+ CommitPulls: true,
+ CommitTrash: true,
+ Logger: log.New(logBuf, "", log.LstdFlags),
+ }
+ err := (&Balancer{}).Run(s.config, opts)
+ c.Check(err, check.IsNil)
+ if iter == 0 {
+ c.Check(logBuf.String(), check.Matches, `(?ms).*ChangeSet{Pulls:1.*`)
+ c.Check(logBuf.String(), check.Not(check.Matches), `(?ms).*ChangeSet{.*Trashes:[^0]}*`)
+ } else if strings.Contains(logBuf.String(), "ChangeSet{Pulls:0") {
+ break
+ }
+ time.Sleep(200 * time.Millisecond)
+ }
+ c.Check(logBuf.String(), check.Not(check.Matches), `(?ms).*0 replicas (0 blocks, 0 bytes) underreplicated.*`)
+}
diff --git a/services/keep-balance/keep_service.go b/services/keep-balance/keep_service.go
new file mode 100644
index 0000000..f65355d
--- /dev/null
+++ b/services/keep-balance/keep_service.go
@@ -0,0 +1,76 @@
+package main
+
+import (
+ "encoding/json"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "net/http"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+// KeepService represents a keepstore server that is being rebalanced.
+type KeepService struct {
+ arvados.KeepService
+ *ChangeSet
+}
+
+// String implements fmt.Stringer.
+func (srv *KeepService) String() string {
+ return fmt.Sprintf("%s (%s:%d, %s)", srv.UUID, srv.ServiceHost, srv.ServicePort, srv.ServiceType)
+}
+
+var ksSchemes = map[bool]string{false: "http", true: "https"}
+
+// URLBase returns scheme://host:port for this server.
+func (srv *KeepService) URLBase() string {
+ return fmt.Sprintf("%s://%s:%d", ksSchemes[srv.ServiceSSLFlag], srv.ServiceHost, srv.ServicePort)
+}
+
+// CommitPulls sends the current list of pull requests to the storage
+// server (even if the list is empty).
+func (srv *KeepService) CommitPulls(c *arvados.Client) error {
+ return srv.put(c, "pull", srv.ChangeSet.Pulls)
+}
+
+// CommitTrash sends the current list of trash requests to the storage
+// server (even if the list is empty).
+func (srv *KeepService) CommitTrash(c *arvados.Client) error {
+ return srv.put(c, "trash", srv.ChangeSet.Trashes)
+}
+
+// Perform a PUT request at path, with data (as JSON) in the request
+// body.
+func (srv *KeepService) put(c *arvados.Client, path string, data interface{}) error {
+ // We'll start a goroutine to do the JSON encoding, so we can
+ // stream it to the http client through a Pipe, rather than
+ // keeping the entire encoded version in memory.
+ jsonR, jsonW := io.Pipe()
+
+ // errC communicates any encoding errors back to our main
+ // goroutine.
+ errC := make(chan error, 1)
+
+ go func() {
+ enc := json.NewEncoder(jsonW)
+ errC <- enc.Encode(data)
+ jsonW.Close()
+ }()
+
+ url := srv.URLBase() + "/" + path
+ req, err := http.NewRequest("PUT", url, ioutil.NopCloser(jsonR))
+ if err != nil {
+ return fmt.Errorf("building request for %s: %v", url, err)
+ }
+ err = c.DoAndDecode(nil, req)
+
+ // If there was an error encoding the request body, report
+ // that instead of the response: obviously we won't get a
+ // useful response if our request wasn't properly encoded.
+ if encErr := <-errC; encErr != nil {
+ return fmt.Errorf("encoding data for %s: %v", url, encErr)
+ }
+
+ return err
+}
diff --git a/services/keep-balance/main.go b/services/keep-balance/main.go
new file mode 100644
index 0000000..42a8d63
--- /dev/null
+++ b/services/keep-balance/main.go
@@ -0,0 +1,156 @@
+package main
+
+import (
+ "encoding/json"
+ "flag"
+ "io/ioutil"
+ "log"
+ "os"
+ "os/signal"
+ "syscall"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+// Config specifies site configuration, like API credentials and the
+// choice of which servers are to be balanced.
+//
+// Config is loaded from a JSON config file (see usage()).
+type Config struct {
+ // Arvados API endpoint and credentials.
+ Client arvados.Client
+
+ // List of service types (e.g., "disk") to balance.
+ KeepServiceTypes []string
+
+ KeepServiceList arvados.KeepServiceList
+
+ // How often to check
+ RunPeriod arvados.Duration
+}
+
+// RunOptions controls runtime behavior. The flags/options that belong
+// here are the ones that are useful for interactive use. For example,
+// "CommitTrash" is a runtime option rather than a config item because
+// it invokes a troubleshooting feature rather than expressing how
+// balancing is meant to be done at a given site.
+//
+// RunOptions fields are controlled by command line flags.
+type RunOptions struct {
+ Once bool
+ CommitPulls bool
+ CommitTrash bool
+ Logger *log.Logger
+ Dumper *log.Logger
+}
+
+var debugf = func(string, ...interface{}) {}
+
+func main() {
+ var config Config
+ var runOptions RunOptions
+
+ configPath := flag.String("config", "",
+ "`path` of json configuration file")
+ serviceListPath := flag.String("config.KeepServiceList", "",
+ "`path` of json file with list of keep services to balance, as given by \"arv keep_service list\" "+
+ "(default: config[\"KeepServiceList\"], or if none given, get all available services and filter by config[\"KeepServiceTypes\"])")
+ flag.BoolVar(&runOptions.Once, "once", false,
+ "balance once and then exit")
+ flag.BoolVar(&runOptions.CommitPulls, "commit-pulls", false,
+ "send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)")
+ flag.BoolVar(&runOptions.CommitTrash, "commit-trash", false,
+ "send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)")
+ dumpFlag := flag.Bool("dump", false, "dump details for each block to stdout")
+ debugFlag := flag.Bool("debug", false, "enable debug messages")
+ flag.Usage = usage
+ flag.Parse()
+
+ if *configPath == "" {
+ log.Fatal("You must specify a config file (see `keep-balance -help`)")
+ }
+ mustReadJSON(&config, *configPath)
+ if *serviceListPath != "" {
+ mustReadJSON(&config.KeepServiceList, *serviceListPath)
+ }
+
+ if *debugFlag {
+ debugf = log.Printf
+ if j, err := json.Marshal(config); err != nil {
+ log.Fatal(err)
+ } else {
+ log.Printf("config is %s", j)
+ }
+ }
+ if *dumpFlag {
+ runOptions.Dumper = log.New(os.Stdout, "", log.LstdFlags)
+ }
+ err := CheckConfig(config, runOptions)
+ if err != nil {
+ // (don't run)
+ } else if runOptions.Once {
+ err = (&Balancer{}).Run(config, runOptions)
+ } else {
+ err = RunForever(config, runOptions, nil)
+ }
+ if err != nil {
+ log.Fatal(err)
+ }
+}
+
+func mustReadJSON(dst interface{}, path string) {
+ if buf, err := ioutil.ReadFile(path); err != nil {
+ log.Fatalf("Reading %q: %v", path, err)
+ } else if err = json.Unmarshal(buf, dst); err != nil {
+ log.Fatalf("Decoding %q: %v", path, err)
+ }
+}
+
+// RunForever runs forever, or (for testing purposes) until the given
+// stop channel is ready to receive.
+func RunForever(config Config, runOptions RunOptions, stop <-chan interface{}) error {
+ if runOptions.Logger == nil {
+ runOptions.Logger = log.New(os.Stderr, "", log.LstdFlags)
+ }
+ logger := runOptions.Logger
+
+ ticker := time.NewTicker(time.Duration(config.RunPeriod))
+
+ // The unbuffered channel here means we only hear SIGUSR1 if
+ // it arrives while we're waiting in select{}.
+ sigUSR1 := make(chan os.Signal)
+ signal.Notify(sigUSR1, syscall.SIGUSR1)
+
+ logger.Printf("starting up: will scan every %v and on SIGUSR1", config.RunPeriod)
+
+ for {
+ if !runOptions.CommitPulls && !runOptions.CommitTrash {
+ logger.Print("WARNING: Will scan periodically, but no changes will be committed.")
+ logger.Print("======= Consider using -commit-pulls and -commit-trash flags.")
+ }
+
+ err := (&Balancer{}).Run(config, runOptions)
+ if err != nil {
+ logger.Print("run failed: ", err)
+ } else {
+ logger.Print("run succeeded")
+ }
+
+ select {
+ case <-stop:
+ signal.Stop(sigUSR1)
+ return nil
+ case <-ticker.C:
+ logger.Print("timer went off")
+ case <-sigUSR1:
+ logger.Print("received SIGUSR1, resetting timer")
+ // Reset the timer so we don't start the N+1st
+ // run too soon after the Nth run is triggered
+ // by SIGUSR1.
+ ticker.Stop()
+ ticker = time.NewTicker(time.Duration(config.RunPeriod))
+ }
+ logger.Print("starting next run")
+ }
+}
diff --git a/services/keep-balance/main_test.go b/services/keep-balance/main_test.go
new file mode 100644
index 0000000..4a56098
--- /dev/null
+++ b/services/keep-balance/main_test.go
@@ -0,0 +1,43 @@
+package main
+
+import (
+ "encoding/json"
+ "time"
+
+ check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&mainSuite{})
+
+type mainSuite struct{}
+
+func (s *mainSuite) TestExampleJSON(c *check.C) {
+ var config Config
+ c.Check(json.Unmarshal(exampleConfigFile, &config), check.IsNil)
+ c.Check(config.KeepServiceTypes, check.DeepEquals, []string{"disk"})
+ c.Check(config.Client.AuthToken, check.Equals, "xyzzy")
+ c.Check(time.Duration(config.RunPeriod), check.Equals, 600*time.Second)
+}
+
+func (s *mainSuite) TestConfigJSONWithKeepServiceList(c *check.C) {
+ var config Config
+ c.Check(json.Unmarshal([]byte(`
+ {
+ "Client": {
+ "APIHost": "zzzzz.arvadosapi.com:443",
+ "AuthToken": "xyzzy",
+ "Insecure": false
+ },
+ "KeepServiceList": {
+ "items": [
+ {"uuid":"zzzzz-bi64l-abcdefghijklmno", "service_type":"disk", "service_host":"a.zzzzz.arvadosapi.com", "service_port":12345},
+ {"uuid":"zzzzz-bi64l-bcdefghijklmnop", "service_type":"blob", "service_host":"b.zzzzz.arvadosapi.com", "service_port":12345}
+ ]
+ },
+ "RunPeriod": "600s"
+ }`), &config), check.IsNil)
+ c.Assert(len(config.KeepServiceList.Items), check.Equals, 2)
+ c.Check(config.KeepServiceList.Items[0].UUID, check.Equals, "zzzzz-bi64l-abcdefghijklmno")
+ c.Check(config.KeepServiceList.Items[0].ServicePort, check.Equals, 12345)
+ c.Check(config.Client.AuthToken, check.Equals, "xyzzy")
+}
diff --git a/services/keep-balance/time_me.go b/services/keep-balance/time_me.go
new file mode 100644
index 0000000..e5f16b7
--- /dev/null
+++ b/services/keep-balance/time_me.go
@@ -0,0 +1,14 @@
+package main
+
+import (
+ "log"
+ "time"
+)
+
+func timeMe(logger *log.Logger, label string) func() {
+ t0 := time.Now()
+ logger.Printf("%s: start", label)
+ return func() {
+ logger.Printf("%s: took %v", label, time.Since(t0))
+ }
+}
diff --git a/services/keep-balance/usage.go b/services/keep-balance/usage.go
new file mode 100644
index 0000000..eb9990c
--- /dev/null
+++ b/services/keep-balance/usage.go
@@ -0,0 +1,83 @@
+package main
+
+import (
+ "flag"
+ "fmt"
+ "os"
+)
+
+var exampleConfigFile = []byte(`
+ {
+ "Client": {
+ "APIHost": "zzzzz.arvadosapi.com:443",
+ "AuthToken": "xyzzy",
+ "Insecure": false
+ },
+ "KeepServiceTypes": [
+ "disk"
+ ],
+ "RunPeriod": "600s"
+ }`)
+
+func usage() {
+ fmt.Fprintf(os.Stderr, `
+
+keep-balance rebalances a set of keepstore servers. It creates new
+copies of underreplicated blocks, deletes excess copies of
+overreplicated and unreferenced blocks, and moves blocks to better
+positions (according to the rendezvous hash algorithm) so clients find
+them faster.
+
+Usage: keep-balance -config path/to/config.json [options]
+
+Options:
+`)
+ flag.PrintDefaults()
+ fmt.Fprintf(os.Stderr, `
+Example config file:
+%s
+
+ Client.AuthToken must be recognized by Arvados as an admin token,
+ and must be recognized by all Keep services as a "data manager
+ key".
+
+ Client.Insecure should be true if your Arvados API endpoint uses
+ an unverifiable SSL/TLS certificate.
+
+Periodic scanning:
+
+ By default, keep-balance operates periodically, i.e.: do a
+ scan/balance operation, sleep, repeat.
+
+ RunPeriod determines the interval between start times of
+ successive scan/balance operations. If a scan/balance operation
+ takes longer than RunPeriod, the next one will follow it
+ immediately.
+
+ If SIGUSR1 is received during an idle period between operations,
+ the next operation will start immediately.
+
+One-time scanning:
+
+ Use the -once flag to do a single operation and then exit. The
+ exit code will be zero if the operation was successful.
+
+Committing:
+
+ By default, keep-service computes and reports changes but does not
+ implement them by sending pull and trash lists to the Keep
+ services.
+
+ Use the -commit-pull and -commit-trash flags to implement the
+ computed changes.
+
+Limitations:
+
+ keep-balance does not attempt to discover whether committed pull
+ and trash requests ever get carried out -- only that they are
+ accepted by the Keep services. If some services are full, new
+ copies of underreplicated blocks might never get made, only
+ repeatedly requested.
+
+`, exampleConfigFile)
+}
diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index 80d8670..d7da67c 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -4,6 +4,7 @@ import (
"bytes"
"flag"
"fmt"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/httpserver"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
"io/ioutil"
@@ -331,8 +332,12 @@ func main() {
}
// Initialize Pull queue and worker
+ arv, err := arvadosclient.MakeArvadosClient()
+ if err != nil {
+ log.Fatalf("MakeArvadosClient: %s", err)
+ }
keepClient := &keepclient.KeepClient{
- Arvados: nil,
+ Arvados: &arv,
Want_replicas: 1,
Client: &http.Client{},
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list