[ARVADOS] updated: a20bed65f3df0ea1dcc560d2592beca2285e2bc3
Git user
git at public.curoverse.com
Tue May 31 16:19:28 EDT 2016
Summary of changes:
discards 7fb94d8c6a6501bb3b440bcf72aaeecfbbfe761c (commit)
discards f215d5425814868093bbedc50fe2fe5242681b2c (commit)
discards 335f749ab589d21c87301beb27c0bad84282f6d1 (commit)
discards 8b97af24fe40673cfa7505028e305a39708a96e0 (commit)
discards 5838ef2b29bcc7e65133e482a011649b8464e511 (commit)
discards 2e3dfa6394ea17ab266b4c6e2346a469c63efb74 (commit)
discards a43da81e3e7453bb73d69cca74b82f73a69286eb (commit)
discards 1c470ee799e6b085929155a3942d2a3d47276954 (commit)
discards b364188ab1e87030997168a2cfddac2a41f5b5b9 (commit)
discards a52ae537be1f877ae4242dc6e9d580073999f3e8 (commit)
discards a4c0045623e096ce5950f137e014e0a9bf9fc5d7 (commit)
discards 595f455a425d23e79789c2346a22a4a23520a417 (commit)
discards 30c9c2e679fe43d3a7d5e8770496beca4cf87282 (commit)
discards 6e6a5f4005e7f7c3a41d632ed350f2da918cd01c (commit)
discards 1b6238eb711a3b62a9f6c67460e20b24a2dc1e11 (commit)
discards ac9cacc92f66867673e186716616452cb628352f (commit)
discards c4c44a33aabe63ad95d3bd02899390ae269cd1c8 (commit)
discards 530ab920f030bc3990a79506ede5b8ee9f21188d (commit)
discards 0e011fe94001dcf93758cc740e8a9998c64a3558 (commit)
discards 605ca9ec67ca4587f9385bc1753875c9e702a17e (commit)
discards cf2d28d648724016e578b57f5c5fb92d3f2b2c19 (commit)
discards 917ff758d05b39c88d75affabb00b25fc65846d9 (commit)
discards 6717b69f19af5dd4489bcc97c3e328fc499fa311 (commit)
discards b32b8c95b5a8a1c6fa2e0a541354c46fc11d5055 (commit)
discards 0726bc8d809bc2012d28fac547e8902b89bad45c (commit)
discards 34df3cb56d837687c47931dcaba3acdad81fd1ad (commit)
discards de03c92e6887f1ba9fa05d39e2ce1e5bd9354966 (commit)
discards d836ae6711f710c05980d606ee5bd017587ecbc1 (commit)
discards ae2d40d821f0e8d81d9ba6468980b5eea278a5ed (commit)
discards 177bfc7d6088040164b1242247e13a7c5bc45f0c (commit)
discards 4ed795f09ea2c8a1c3d11faf564183ebf7d70a86 (commit)
discards a0c3e9caf12ba9873c28aae990c1d2abf480f34b (commit)
discards 92d586bf985b811f730be93834f297a10288dcbb (commit)
discards 464e6ca61f815222925c4d75dea0dc8164c50b14 (commit)
discards bf51643f601fdf168d7b45b5e266418f039df5a1 (commit)
discards 5c27dc48c7007c1deff4b2877b2c81906df47015 (commit)
discards 87f2f44c70532acc70885c5b7f528ce4d45cc65d (commit)
discards 92fd8da1e911a2ef39fb23d944f124d4264c3322 (commit)
discards 32899e6b4ca7b5d59e3136f44785d0971a3b19ee (commit)
discards 952e778be341957d42278263bd008181ee0c5d21 (commit)
discards 01aa17fb0f5de5a7e9a287e54ef15008d7af7260 (commit)
discards 24815fb6113db774abb81dd543dbc80926bf5bce (commit)
discards cdfc5f568046d2f39861cea65b62682bc9ddbce1 (commit)
discards 6251cb2459021711d9d9c0c8aee47149b4ed336c (commit)
discards 9fd62c6d9cc009af02aa709d5aadba37e1ce5b5b (commit)
discards abe6659e55121a45342bbacd2651c7b51cd7d4f8 (commit)
discards fa93bba9f4496b2b52ded5234e190f99a3d4fc58 (commit)
via a20bed65f3df0ea1dcc560d2592beca2285e2bc3 (commit)
via b7994dd0a254812b25ff7028d8f2d9023f2e308f (commit)
This update added new revisions after undoing existing revisions. That is
to say, the old revision is not a strict subset of the new revision. This
situation occurs when you --force push a change and generate a repository
containing something like this:
* -- * -- B -- O -- O -- O (7fb94d8c6a6501bb3b440bcf72aaeecfbbfe761c)
\
N -- N -- N (a20bed65f3df0ea1dcc560d2592beca2285e2bc3)
When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit a20bed65f3df0ea1dcc560d2592beca2285e2bc3
Author: Tom Clegg <tom at curoverse.com>
Date: Tue May 24 10:02:39 2016 -0400
9162: Add replication level histogram
Ported from 00a8ece1580a894dbbf9f756685eefc134e4d0d6 by jrandall
diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go
index 7471aa6..2a2480c 100644
--- a/services/keep-balance/balance.go
+++ b/services/keep-balance/balance.go
@@ -3,6 +3,7 @@ package main
import (
"fmt"
"log"
+ "math"
"os"
"runtime"
"strings"
@@ -447,9 +448,11 @@ type balancerStats struct {
lost, overrep, unref, garbage, underrep, justright blocksNBytes
desired, current blocksNBytes
pulls, trashes int
+ replHistogram []int
}
func (bal *Balancer) getStatistics() (s balancerStats) {
+ s.replHistogram = make([]int, 2)
bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
surplus := len(blk.Replicas) - blk.Desired
bytes := blkid.Size()
@@ -493,6 +496,11 @@ func (bal *Balancer) getStatistics() (s balancerStats) {
s.current.blocks++
s.current.bytes += bytes * int64(len(blk.Replicas))
}
+
+ for len(s.replHistogram) <= len(blk.Replicas) {
+ s.replHistogram = append(s.replHistogram, 0)
+ }
+ s.replHistogram[len(blk.Replicas)]++
})
for _, srv := range bal.KeepServices {
s.pulls += len(srv.ChangeSet.Pulls)
@@ -521,6 +529,25 @@ func (bal *Balancer) PrintStatistics() {
bal.logf("%s: %v\n", srv, srv.ChangeSet)
}
bal.logf("===")
+ bal.printHistogram(s, 60)
+ bal.logf("===")
+}
+
+func (bal *Balancer) printHistogram(s balancerStats, hashColumns int) {
+ bal.logf("Replication level distribution (counting N replicas on a single server as N):")
+ maxCount := 0
+ for _, count := range s.replHistogram {
+ if maxCount < count {
+ maxCount = count
+ }
+ }
+ hashes := strings.Repeat("#", hashColumns)
+ countWidth := 1 + int(math.Log10(float64(maxCount+1)))
+ scaleCount := 10 * float64(hashColumns) / math.Floor(1+10*math.Log10(float64(maxCount+1)))
+ for repl, count := range s.replHistogram {
+ nHashes := int(scaleCount * math.Log10(float64(count+1)))
+ bal.logf("%2d: %*d %s", repl, countWidth, count, hashes[:nHashes])
+ }
}
// CheckSanityLate checks for configuration and runtime errors after
commit b7994dd0a254812b25ff7028d8f2d9023f2e308f
Author: Tom Clegg <tom at curoverse.com>
Date: Mon May 16 17:09:21 2016 -0400
9162: Add keep-balance
diff --git a/build/package-build-dockerfiles/Makefile b/build/package-build-dockerfiles/Makefile
index 9216f82..3482886 100644
--- a/build/package-build-dockerfiles/Makefile
+++ b/build/package-build-dockerfiles/Makefile
@@ -20,10 +20,12 @@ ubuntu1404/generated: common-generated-all
test -d ubuntu1404/generated || mkdir ubuntu1404/generated
cp -rlt ubuntu1404/generated common-generated/*
-common-generated-all: common-generated/golang-amd64.tar.gz
+GOTARBALL=go1.6.2.linux-amd64.tar.gz
-common-generated/golang-amd64.tar.gz: common-generated
- wget -cqO common-generated/golang-amd64.tar.gz http://storage.googleapis.com/golang/go1.4.2.linux-amd64.tar.gz
+common-generated-all: common-generated/$(GOTARBALL)
+
+common-generated/$(GOTARBALL): common-generated
+ wget -cqO common-generated/$(GOTARBALL) http://storage.googleapis.com/golang/$(GOTARBALL)
common-generated:
mkdir common-generated
diff --git a/build/package-build-dockerfiles/centos6/Dockerfile b/build/package-build-dockerfiles/centos6/Dockerfile
index c21c091..570dde1 100644
--- a/build/package-build-dockerfiles/centos6/Dockerfile
+++ b/build/package-build-dockerfiles/centos6/Dockerfile
@@ -5,7 +5,7 @@ MAINTAINER Brett Smith <brett at curoverse.com>
RUN yum -q -y install make automake gcc gcc-c++ libyaml-devel patch readline-devel zlib-devel libffi-devel openssl-devel bzip2 libtool bison sqlite-devel rpm-build git perl-ExtUtils-MakeMaker libattr-devel nss-devel libcurl-devel which tar unzip scl-utils centos-release-scl postgresql-devel
# Install golang binary
-ADD generated/golang-amd64.tar.gz /usr/local/
+ADD generated/go1.6.2.linux-amd64.tar.gz /usr/local/
RUN ln -s /usr/local/go/bin/go /usr/local/bin/
# Install RVM
diff --git a/build/package-build-dockerfiles/debian7/Dockerfile b/build/package-build-dockerfiles/debian7/Dockerfile
index ccc444c..ddad542 100644
--- a/build/package-build-dockerfiles/debian7/Dockerfile
+++ b/build/package-build-dockerfiles/debian7/Dockerfile
@@ -13,7 +13,7 @@ RUN gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3 && \
/usr/local/rvm/bin/rvm-exec default gem install cure-fpm --version 1.6.0b
# Install golang binary
-ADD generated/golang-amd64.tar.gz /usr/local/
+ADD generated/go1.6.2.linux-amd64.tar.gz /usr/local/
RUN ln -s /usr/local/go/bin/go /usr/local/bin/
ENV WORKSPACE /arvados
diff --git a/build/package-build-dockerfiles/debian8/Dockerfile b/build/package-build-dockerfiles/debian8/Dockerfile
index e32cfb4..80f06a2 100644
--- a/build/package-build-dockerfiles/debian8/Dockerfile
+++ b/build/package-build-dockerfiles/debian8/Dockerfile
@@ -13,7 +13,7 @@ RUN gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3 && \
/usr/local/rvm/bin/rvm-exec default gem install cure-fpm --version 1.6.0b
# Install golang binary
-ADD generated/golang-amd64.tar.gz /usr/local/
+ADD generated/go1.6.2.linux-amd64.tar.gz /usr/local/
RUN ln -s /usr/local/go/bin/go /usr/local/bin/
ENV WORKSPACE /arvados
diff --git a/build/package-build-dockerfiles/ubuntu1204/Dockerfile b/build/package-build-dockerfiles/ubuntu1204/Dockerfile
index 34bf698..2f628b0 100644
--- a/build/package-build-dockerfiles/ubuntu1204/Dockerfile
+++ b/build/package-build-dockerfiles/ubuntu1204/Dockerfile
@@ -13,7 +13,7 @@ RUN gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3 && \
/usr/local/rvm/bin/rvm-exec default gem install cure-fpm --version 1.6.0b
# Install golang binary
-ADD generated/golang-amd64.tar.gz /usr/local/
+ADD generated/go1.6.2.linux-amd64.tar.gz /usr/local/
RUN ln -s /usr/local/go/bin/go /usr/local/bin/
ENV WORKSPACE /arvados
diff --git a/build/package-build-dockerfiles/ubuntu1404/Dockerfile b/build/package-build-dockerfiles/ubuntu1404/Dockerfile
index 5377164..b9c003a 100644
--- a/build/package-build-dockerfiles/ubuntu1404/Dockerfile
+++ b/build/package-build-dockerfiles/ubuntu1404/Dockerfile
@@ -13,7 +13,7 @@ RUN gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3 && \
/usr/local/rvm/bin/rvm-exec default gem install cure-fpm --version 1.6.0b
# Install golang binary
-ADD generated/golang-amd64.tar.gz /usr/local/
+ADD generated/go1.6.2.linux-amd64.tar.gz /usr/local/
RUN ln -s /usr/local/go/bin/go /usr/local/bin/
ENV WORKSPACE /arvados
diff --git a/build/run-build-packages-one-target.sh b/build/run-build-packages-one-target.sh
index 322129e..6fdffd0 100755
--- a/build/run-build-packages-one-target.sh
+++ b/build/run-build-packages-one-target.sh
@@ -128,9 +128,10 @@ if test -z "$packages" ; then
arvados-src
arvados-workbench
crunchstat
+ keep-balance
+ keep-block-check
keepproxy
keep-rsync
- keep-block-check
keepstore
keep-web
libarvados-perl"
diff --git a/build/run-build-packages.sh b/build/run-build-packages.sh
index 87d396a..32ca0d8 100755
--- a/build/run-build-packages.sh
+++ b/build/run-build-packages.sh
@@ -383,6 +383,8 @@ package_go_binary services/keepstore keepstore \
"Keep storage daemon, accessible to clients on the LAN"
package_go_binary services/keepproxy keepproxy \
"Make a Keep cluster accessible to clients that are not on the LAN"
+package_go_binary services/keep-balance keep-balance \
+ "Rebalance and garbage-collect data blocks stored in Arvados Keep"
package_go_binary services/keep-web keep-web \
"Static web hosting service for user data stored in Arvados Keep"
package_go_binary services/datamanager arvados-data-manager \
diff --git a/build/run-tests.sh b/build/run-tests.sh
index d229341..30a80f5 100755
--- a/build/run-tests.sh
+++ b/build/run-tests.sh
@@ -70,6 +70,7 @@ services/fuse
services/keep-web
services/keepproxy
services/keepstore
+services/keep-balance
services/login-sync
services/nodemanager
services/crunch-run
@@ -79,6 +80,7 @@ sdk/cli
sdk/pam
sdk/python
sdk/ruby
+sdk/go/arvados
sdk/go/arvadosclient
sdk/go/keepclient
sdk/go/httpserver
@@ -150,6 +152,8 @@ sanity_checks() {
echo -n 'go: '
go version \
|| fatal "No go binary. See http://golang.org/doc/install"
+ [[ $(go version) =~ go1.([0-9]+) ]] && [[ ${BASH_REMATCH[1]} -ge 6 ]] \
+ || fatal "Go >= 1.6 required. See http://golang.org/doc/install"
echo -n 'gcc: '
gcc --version | egrep ^gcc \
|| fatal "No gcc. Try: apt-get install build-essential"
@@ -499,7 +503,7 @@ do_test_once() {
then
# "go test -check.vv giturl" doesn't work, but this
# does:
- cd "$WORKSPACE/$1" && go test ${short:+-short} ${coverflags[@]} ${testargs[$1]}
+ cd "$WORKSPACE/$1" && go test ${short:+-short} ${testargs[$1]}
else
# The above form gets verbose even when testargs is
# empty, so use this form in such cases:
@@ -703,6 +707,7 @@ do_install services/api apiserver
declare -a gostuff
gostuff=(
+ sdk/go/arvados
sdk/go/arvadosclient
sdk/go/blockdigest
sdk/go/httpserver
@@ -714,6 +719,7 @@ gostuff=(
services/keep-web
services/keepstore
sdk/go/keepclient
+ services/keep-balance
services/keepproxy
services/datamanager/summary
services/datamanager/collection
diff --git a/sdk/go/arvados/client.go b/sdk/go/arvados/client.go
new file mode 100644
index 0000000..ee830c8
--- /dev/null
+++ b/sdk/go/arvados/client.go
@@ -0,0 +1,169 @@
+package arvados
+
+import (
+ "crypto/tls"
+ "encoding/json"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "net/http"
+ "net/url"
+ "os"
+)
+
+// A Client is an HTTP client with an API endpoint and a set of
+// Arvados credentials.
+//
+// It offers methods for accessing individual Arvados APIs, and
+// methods that implement common patterns like fetching multiple pages
+// of results using List APIs.
+type Client struct {
+ // HTTP client used to make requests. If nil,
+ // http.DefaultClient or InsecureHTTPClient will be used.
+ Client *http.Client
+
+ // Hostname (or host:port) of Arvados API server.
+ APIHost string
+
+ // User authentication token.
+ AuthToken string
+
+ // Accept unverified certificates. This works only if the
+ // Client field is nil: otherwise, it has no effect.
+ Insecure bool
+}
+
+// The default http.Client used by a Client with Insecure==true and
+// Client==nil.
+var InsecureHTTPClient = &http.Client{
+ Transport: &http.Transport{
+ TLSClientConfig: &tls.Config{
+ InsecureSkipVerify: true}}}
+
+// NewClientFromEnv creates a new Client that uses the default HTTP
+// client with the API endpoint and credentials given by the
+// ARVADOS_API_* environment variables.
+func NewClientFromEnv() *Client {
+ return &Client{
+ APIHost: os.Getenv("ARVADOS_API_HOST"),
+ AuthToken: os.Getenv("ARVADOS_API_TOKEN"),
+ Insecure: os.Getenv("ARVADOS_API_HOST_INSECURE") != "",
+ }
+}
+
+// Do adds authentication headers and then calls (*http.Client)Do().
+func (c *Client) Do(req *http.Request) (*http.Response, error) {
+ if c.AuthToken != "" {
+ req.Header.Add("Authorization", "OAuth2 "+c.AuthToken)
+ }
+ return c.httpClient().Do(req)
+}
+
+// DoAndDecode performs req and unmarshals the response (which must be
+// JSON) into dst. Use this instead of RequestAndDecode if you need
+// more control of the http.Request object.
+func (c *Client) DoAndDecode(dst interface{}, req *http.Request) error {
+ resp, err := c.Do(req)
+ if err != nil {
+ return err
+ }
+ defer resp.Body.Close()
+ buf, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ return err
+ }
+ if resp.StatusCode != 200 {
+ return fmt.Errorf("request failed (%s): %s", req.URL, resp.Status)
+ }
+ if dst == nil {
+ return nil
+ }
+ return json.Unmarshal(buf, dst)
+}
+
+// RequestAndDecode performs an API request and unmarshals the
+// response (which must be JSON) into dst. Method and body arguments
+// are the same as for http.NewRequest(). The given path is added to
+// the server's scheme/host/port to form the request URL. The given
+// params are passed via POST form or query string.
+//
+// path must not contain a query string.
+func (c *Client) RequestAndDecode(dst interface{}, method, path string, body io.Reader, params interface{}) error {
+ urlString := c.apiURL(path)
+ var urlValues url.Values
+ if v, ok := params.(url.Values); ok {
+ urlValues = v
+ } else if params != nil {
+ // Convert an arbitrary struct to url.Values. For
+ // example, Foo{Bar: []int{1,2,3}, Baz: "waz"} becomes
+ // url.Values{`bar`:`{"a":[1,2,3]}`,`Baz`:`waz`}
+ //
+ // TODO: Do this more efficiently, possibly using
+ // json.Decode/Encode, so the whole thing doesn't have
+ // to get encoded, decoded, and re-encoded.
+ j, err := json.Marshal(params)
+ if err != nil {
+ return err
+ }
+ var generic map[string]interface{}
+ err = json.Unmarshal(j, &generic)
+ if err != nil {
+ return err
+ }
+ urlValues = url.Values{}
+ for k, v := range generic {
+ if v, ok := v.(string); ok {
+ urlValues.Set(k, v)
+ continue
+ }
+ j, err := json.Marshal(v)
+ if err != nil {
+ return err
+ }
+ urlValues.Set(k, string(j))
+ }
+ }
+ if (method == "GET" || body != nil) && urlValues != nil {
+ // FIXME: what if params don't fit in URL
+ u, err := url.Parse(urlString)
+ if err != nil {
+ return err
+ }
+ u.RawQuery = urlValues.Encode()
+ urlString = u.String()
+ }
+ req, err := http.NewRequest(method, urlString, body)
+ if err != nil {
+ return err
+ }
+ return c.DoAndDecode(dst, req)
+}
+
+func (c *Client) httpClient() *http.Client {
+ switch {
+ case c.Client != nil:
+ return c.Client
+ case c.Insecure:
+ return InsecureHTTPClient
+ default:
+ return http.DefaultClient
+ }
+}
+
+func (c *Client) apiURL(path string) string {
+ return "https://" + c.APIHost + "/" + path
+}
+
+// DiscoveryDocument is the Arvados server's description of itself.
+type DiscoveryDocument struct {
+ DefaultCollectionReplication int `json:"defaultCollectionReplication"`
+ BlobSignatureTTL int64 `json:"blobSignatureTtl"`
+}
+
+// DiscoveryDocument returns a *DiscoveryDocument. The returned object
+// should not be modified: the same object may be returned by
+// subsequent calls.
+func (c *Client) DiscoveryDocument() (*DiscoveryDocument, error) {
+ var dd DiscoveryDocument
+ return &dd, c.RequestAndDecode(&dd, "GET", "discovery/v1/apis/arvados/v1/rest", nil, nil)
+}
diff --git a/sdk/go/arvados/client_test.go b/sdk/go/arvados/client_test.go
new file mode 100644
index 0000000..2db50bf
--- /dev/null
+++ b/sdk/go/arvados/client_test.go
@@ -0,0 +1,83 @@
+package arvados
+
+import (
+ "bytes"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+ "sync"
+ "testing"
+)
+
+type stubTransport struct {
+ Responses map[string]string
+ Requests []http.Request
+ sync.Mutex
+}
+
+func (stub *stubTransport) RoundTrip(req *http.Request) (*http.Response, error) {
+ stub.Lock()
+ stub.Requests = append(stub.Requests, *req)
+ stub.Unlock()
+
+ resp := &http.Response{
+ Status: "200 OK",
+ StatusCode: 200,
+ Proto: "HTTP/1.1",
+ ProtoMajor: 1,
+ ProtoMinor: 1,
+ Request: req,
+ }
+ str := stub.Responses[req.URL.Path]
+ if str == "" {
+ resp.Status = "404 Not Found"
+ resp.StatusCode = 404
+ str = "{}"
+ }
+ buf := bytes.NewBufferString(str)
+ resp.Body = ioutil.NopCloser(buf)
+ resp.ContentLength = int64(buf.Len())
+ return resp, nil
+}
+
+type errorTransport struct{}
+
+func (stub *errorTransport) RoundTrip(req *http.Request) (*http.Response, error) {
+ return nil, fmt.Errorf("something awful happened")
+}
+
+func TestCurrentUser(t *testing.T) {
+ t.Parallel()
+ stub := &stubTransport{
+ Responses: map[string]string{
+ "/arvados/v1/users/current": `{"uuid":"zzzzz-abcde-012340123401234"}`,
+ },
+ }
+ c := &Client{
+ Client: &http.Client{
+ Transport: stub,
+ },
+ APIHost: "zzzzz.arvadosapi.com",
+ AuthToken: "xyzzy",
+ }
+ u, err := c.CurrentUser()
+ if err != nil {
+ t.Fatal(err)
+ }
+ if x := "zzzzz-abcde-012340123401234"; u.UUID != x {
+ t.Errorf("got uuid %q, expected %q", u.UUID, x)
+ }
+ if len(stub.Requests) < 1 {
+ t.Fatal("empty stub.Requests")
+ }
+ hdr := stub.Requests[len(stub.Requests)-1].Header
+ if hdr.Get("Authorization") != "OAuth2 xyzzy" {
+ t.Errorf("got headers %+q, expected Authorization header", hdr)
+ }
+
+ c.Client.Transport = &errorTransport{}
+ u, err = c.CurrentUser()
+ if err == nil {
+ t.Errorf("got nil error, expected something awful")
+ }
+}
diff --git a/sdk/go/arvados/collection.go b/sdk/go/arvados/collection.go
new file mode 100644
index 0000000..71f5247
--- /dev/null
+++ b/sdk/go/arvados/collection.go
@@ -0,0 +1,62 @@
+package arvados
+
+import (
+ "bufio"
+ "fmt"
+ "strings"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/manifest"
+)
+
+// Collection is an arvados#collection resource.
+type Collection struct {
+ UUID string `json:"uuid,omitempty"`
+ ExpiresAt *time.Time `json:"expires_at,omitempty"`
+ ManifestText string `json:"manifest_text,omitempty"`
+ CreatedAt *time.Time `json:"created_at,omitempty"`
+ ModifiedAt *time.Time `json:"modified_at,omitempty"`
+ PortableDataHash string `json:"portable_data_hash,omitempty"`
+ ReplicationConfirmed *int `json:"replication_confirmed,omitempty"`
+ ReplicationConfirmedAt *time.Time `json:"replication_confirmed_at,omitempty"`
+ ReplicationDesired *int `json:"replication_desired,omitempty"`
+}
+
+// SizedDigests returns the hash+size part of each data block
+// referenced by the collection.
+func (c *Collection) SizedDigests() ([]SizedDigest, error) {
+ if c.ManifestText == "" && c.PortableDataHash != "d41d8cd98f00b204e9800998ecf8427e+0" {
+ // TODO: Check more subtle forms of corruption, too
+ return nil, fmt.Errorf("manifest is missing")
+ }
+ var sds []SizedDigest
+ scanner := bufio.NewScanner(strings.NewReader(c.ManifestText))
+ scanner.Buffer(make([]byte, 1048576), len(c.ManifestText))
+ for scanner.Scan() {
+ line := scanner.Text()
+ tokens := strings.Split(line, " ")
+ if len(tokens) < 3 {
+ return nil, fmt.Errorf("Invalid stream (<3 tokens): %q", line)
+ }
+ for _, token := range tokens[1:] {
+ if !manifest.LocatorPattern.MatchString(token) {
+ // FIXME: ensure it's a file token
+ break
+ }
+ // FIXME: shouldn't assume 32 char hash
+ if i := strings.IndexRune(token[33:], '+'); i >= 0 {
+ token = token[:33+i]
+ }
+ sds = append(sds, SizedDigest(token))
+ }
+ }
+ return sds, scanner.Err()
+}
+
+// CollectionList is an arvados#collectionList resource.
+type CollectionList struct {
+ Items []Collection `json:"items"`
+ ItemsAvailable int `json:"items_available"`
+ Offset int `json:"offset"`
+ Limit int `json:"limit"`
+}
diff --git a/sdk/go/arvados/doc.go b/sdk/go/arvados/doc.go
new file mode 100644
index 0000000..1e8141e
--- /dev/null
+++ b/sdk/go/arvados/doc.go
@@ -0,0 +1,12 @@
+// Package arvados is a client library for Arvados.
+//
+// The API is not stable: it should be considered experimental
+// pre-release.
+//
+// The intent is to offer model types and API call functions that can
+// be generated automatically (or at least mostly automatically) from
+// a discovery document. For the time being, there is a manually
+// generated subset of those types and API calls with (approximately)
+// the right signatures, plus client/authentication support and some
+// convenience functions.
+package arvados
diff --git a/sdk/go/arvados/duration.go b/sdk/go/arvados/duration.go
new file mode 100644
index 0000000..1639c58
--- /dev/null
+++ b/sdk/go/arvados/duration.go
@@ -0,0 +1,31 @@
+package arvados
+
+import (
+ "encoding/json"
+ "fmt"
+ "time"
+)
+
+// Duration is time.Duration but looks like "12s" in JSON, rather than
+// a number of nanoseconds.
+type Duration time.Duration
+
+// UnmarshalJSON implements json.Unmarshaler
+func (d *Duration) UnmarshalJSON(data []byte) error {
+ if data[0] == '"' {
+ dur, err := time.ParseDuration(string(data[1 : len(data)-1]))
+ *d = Duration(dur)
+ return err
+ }
+ return fmt.Errorf("duration must be given as a string like \"600s\" or \"1h30m\"")
+}
+
+// MarshalJSON implements json.Marshaler
+func (d *Duration) MarshalJSON() ([]byte, error) {
+ return json.Marshal(d.String())
+}
+
+// String implements fmt.Stringer
+func (d Duration) String() string {
+ return time.Duration(d).String()
+}
diff --git a/sdk/go/arvados/keep_block.go b/sdk/go/arvados/keep_block.go
new file mode 100644
index 0000000..c9a7712
--- /dev/null
+++ b/sdk/go/arvados/keep_block.go
@@ -0,0 +1,15 @@
+package arvados
+
+import (
+ "strconv"
+ "strings"
+)
+
+// SizedDigest is a minimal Keep block locator: hash+size
+type SizedDigest string
+
+// Size returns the size of the data block, in bytes.
+func (sd SizedDigest) Size() int64 {
+ n, _ := strconv.ParseInt(strings.Split(string(sd), "+")[1], 10, 64)
+ return n
+}
diff --git a/sdk/go/arvados/keep_service.go b/sdk/go/arvados/keep_service.go
new file mode 100644
index 0000000..4af1b79
--- /dev/null
+++ b/sdk/go/arvados/keep_service.go
@@ -0,0 +1,123 @@
+package arvados
+
+import (
+ "bufio"
+ "fmt"
+ "net/http"
+ "strconv"
+ "strings"
+)
+
+// KeepService is an arvados#keepService record
+type KeepService struct {
+ UUID string `json:"uuid"`
+ ServiceHost string `json:"service_host"`
+ ServicePort int `json:"service_port"`
+ ServiceSSLFlag bool `json:"service_ssl_flag"`
+ ServiceType string `json:"service_type"`
+ ReadOnly bool `json:"read_only"`
+}
+
+// KeepServiceList is an arvados#keepServiceList record
+type KeepServiceList struct {
+ Items []KeepService `json:"items"`
+ ItemsAvailable int `json:"items_available"`
+ Offset int `json:"offset"`
+ Limit int `json:"limit"`
+}
+
+// KeepServiceIndexEntry is what a keep service's index response tells
+// us about a stored block.
+type KeepServiceIndexEntry struct {
+ SizedDigest
+ Mtime int64
+}
+
+// EachKeepService calls f once for every readable
+// KeepService. EachKeepService stops if it encounters an
+// error, such as f returning a non-nil error.
+func (c *Client) EachKeepService(f func(KeepService) error) error {
+ params := ResourceListParams{}
+ for {
+ var page KeepServiceList
+ err := c.RequestAndDecode(&page, "GET", "arvados/v1/keep_services", nil, params)
+ if err != nil {
+ return err
+ }
+ for _, item := range page.Items {
+ err = f(item)
+ if err != nil {
+ return err
+ }
+ }
+ params.Offset = params.Offset + len(page.Items)
+ if params.Offset >= page.ItemsAvailable {
+ return nil
+ }
+ }
+}
+
+func (s *KeepService) url(path string) string {
+ var f string
+ if s.ServiceSSLFlag {
+ f = "https://%s:%d/%s"
+ } else {
+ f = "http://%s:%d/%s"
+ }
+ return fmt.Sprintf(f, s.ServiceHost, s.ServicePort, path)
+}
+
+// String implements fmt.Stringer
+func (s *KeepService) String() string {
+ return s.UUID
+}
+
+// Index returns an unsorted list of blocks that can be retrieved from
+// this server.
+func (s *KeepService) Index(c *Client, prefix string) ([]KeepServiceIndexEntry, error) {
+ url := s.url("index/" + prefix)
+ req, err := http.NewRequest("GET", url, nil)
+ if err != nil {
+ return nil, fmt.Errorf("NewRequest(%v): %v", url, err)
+ }
+ resp, err := c.Do(req)
+ if err != nil {
+ return nil, fmt.Errorf("Do(%v): %v", url, err)
+ } else if resp.StatusCode != 200 {
+ return nil, fmt.Errorf("%v: %v", url, resp.Status)
+ }
+ defer resp.Body.Close()
+
+ var entries []KeepServiceIndexEntry
+ scanner := bufio.NewScanner(resp.Body)
+ sawEOF := false
+ for scanner.Scan() {
+ if sawEOF {
+ return nil, fmt.Errorf("Index response contained non-terminal blank line")
+ }
+ line := scanner.Text()
+ if line == "" {
+ sawEOF = true
+ continue
+ }
+ fields := strings.Split(line, " ")
+ if len(fields) != 2 {
+ return nil, fmt.Errorf("Malformed index line %q: %d fields", line, len(fields))
+ }
+ mtime, err := strconv.ParseInt(fields[1], 10, 64)
+ if err != nil {
+ return nil, fmt.Errorf("Malformed index line %q: mtime: %v", line, err)
+ }
+ entries = append(entries, KeepServiceIndexEntry{
+ SizedDigest: SizedDigest(fields[0]),
+ Mtime: mtime,
+ })
+ }
+ if err := scanner.Err(); err != nil {
+ return nil, fmt.Errorf("Error scanning index response: %v", err)
+ }
+ if !sawEOF {
+ return nil, fmt.Errorf("Index response had no EOF marker")
+ }
+ return entries, nil
+}
diff --git a/sdk/go/arvados/resource_list.go b/sdk/go/arvados/resource_list.go
new file mode 100644
index 0000000..e9ea268
--- /dev/null
+++ b/sdk/go/arvados/resource_list.go
@@ -0,0 +1,25 @@
+package arvados
+
+import "encoding/json"
+
+// ResourceListParams expresses which results are requested in a
+// list/index API.
+type ResourceListParams struct {
+ Select []string `json:"select,omitempty"`
+ Filters []Filter `json:"filters,omitempty"`
+ Limit *int `json:"limit,omitempty"`
+ Offset int `json:"offset,omitempty"`
+ Order string `json:"order,omitempty"`
+}
+
+// A Filter restricts the set of records returned by a list/index API.
+type Filter struct {
+ Attr string
+ Operator string
+ Operand interface{}
+}
+
+// MarshalJSON encodes a Filter in the form expected by the API.
+func (f *Filter) MarshalJSON() ([]byte, error) {
+ return json.Marshal([]interface{}{f.Attr, f.Operator, f.Operand})
+}
diff --git a/sdk/go/arvados/resource_list_test.go b/sdk/go/arvados/resource_list_test.go
new file mode 100644
index 0000000..b5e6e7d
--- /dev/null
+++ b/sdk/go/arvados/resource_list_test.go
@@ -0,0 +1,21 @@
+package arvados
+
+import (
+ "bytes"
+ "encoding/json"
+ "testing"
+ "time"
+)
+
+func TestMarshalFiltersWithNanoseconds(t *testing.T) {
+ t0 := time.Now()
+ t0str := t0.Format(time.RFC3339Nano)
+ buf, err := json.Marshal([]Filter{
+ {Attr: "modified_at", Operator: "=", Operand: t0}})
+ if err != nil {
+ t.Fatal(err)
+ }
+ if expect := []byte(`[["modified_at","=","` + t0str + `"]]`); 0 != bytes.Compare(buf, expect) {
+ t.Errorf("Encoded as %q, expected %q", buf, expect)
+ }
+}
diff --git a/sdk/go/arvados/user.go b/sdk/go/arvados/user.go
new file mode 100644
index 0000000..684a3af
--- /dev/null
+++ b/sdk/go/arvados/user.go
@@ -0,0 +1,17 @@
+package arvados
+
+// User is an arvados#user record
+type User struct {
+ UUID string `json:"uuid,omitempty"`
+ IsActive bool `json:"is_active"`
+ IsAdmin bool `json:"is_admin"`
+ Username string `json:"username,omitempty"`
+}
+
+// CurrentUser calls arvados.v1.users.current, and returns the User
+// record corresponding to this client's credentials.
+func (c *Client) CurrentUser() (User, error) {
+ var u User
+ err := c.RequestAndDecode(&u, "GET", "arvados/v1/users/current", nil, nil)
+ return u, err
+}
diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go
new file mode 100644
index 0000000..7471aa6
--- /dev/null
+++ b/services/keep-balance/balance.go
@@ -0,0 +1,611 @@
+package main
+
+import (
+ "fmt"
+ "log"
+ "os"
+ "runtime"
+ "strings"
+ "sync"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+)
+
+// CheckConfig returns an error if anything is wrong with the given
+// config and runOptions.
+func CheckConfig(config Config, runOptions RunOptions) error {
+ if len(config.KeepServiceList.Items) > 0 && config.KeepServiceTypes != nil {
+ return fmt.Errorf("cannot specify both KeepServiceList and KeepServiceTypes in config")
+ }
+ if !runOptions.Once && config.RunPeriod == arvados.Duration(0) {
+ return fmt.Errorf("you must either use the -once flag, or specify RunPeriod in config")
+ }
+ return nil
+}
+
+// Balancer compares the contents of keepstore servers with the
+// collections stored in Arvados, and issues pull/trash requests
+// needed to get (closer to) the optimal data layout.
+//
+// In the optimal data layout: every data block referenced by a
+// collection is replicated at least as many times as desired by the
+// collection; there are no unreferenced data blocks older than
+// BlobSignatureTTL; and all N existing replicas of a given data block
+// are in the N best positions in rendezvous probe order.
+type Balancer struct {
+ *BlockStateMap
+ KeepServices map[string]*KeepService
+ DefaultReplication int
+ Logger *log.Logger
+ Dumper *log.Logger
+ MinMtime int64
+
+ collScanned int
+ serviceRoots map[string]string
+ errors []error
+ mutex sync.Mutex
+}
+
+// Run performs a balance operation using the given config and
+// runOptions. It should only be called once on a given Balancer
+// object. Typical usage:
+//
+// err = (&Balancer{}).Run(config, runOptions)
+func (bal *Balancer) Run(config Config, runOptions RunOptions) (err error) {
+ bal.Dumper = runOptions.Dumper
+ bal.Logger = runOptions.Logger
+ if bal.Logger == nil {
+ bal.Logger = log.New(os.Stderr, "", log.LstdFlags)
+ }
+
+ defer timeMe(bal.Logger, "Run")()
+
+ if len(config.KeepServiceList.Items) > 0 {
+ err = bal.SetKeepServices(config.KeepServiceList)
+ } else {
+ err = bal.DiscoverKeepServices(&config.Client, config.KeepServiceTypes)
+ }
+ if err != nil {
+ return
+ }
+
+ if err = bal.CheckSanityEarly(&config.Client); err != nil {
+ return
+ }
+ if runOptions.CommitTrash {
+ if err = bal.ClearTrashLists(&config.Client); err != nil {
+ return
+ }
+ }
+ if err = bal.GetCurrentState(&config.Client); err != nil {
+ return
+ }
+ bal.ComputeChangeSets()
+ bal.PrintStatistics()
+ if err = bal.CheckSanityLate(); err != nil {
+ return
+ }
+ if runOptions.CommitPulls {
+ err = bal.CommitPulls(&config.Client)
+ if err != nil {
+ // Skip trash if we can't pull. (Too cautious?)
+ return
+ }
+ }
+ if runOptions.CommitTrash {
+ err = bal.CommitTrash(&config.Client)
+ }
+ return
+}
+
+// SetKeepServices sets the list of KeepServices to operate on.
+func (bal *Balancer) SetKeepServices(srvList arvados.KeepServiceList) error {
+ bal.KeepServices = make(map[string]*KeepService)
+ for _, srv := range srvList.Items {
+ bal.KeepServices[srv.UUID] = &KeepService{
+ KeepService: srv,
+ ChangeSet: &ChangeSet{},
+ }
+ }
+ return nil
+}
+
+// DiscoverKeepServices sets the list of KeepServices by calling the
+// API to get a list of all services, and selecting the ones whose
+// ServiceType is in okTypes.
+func (bal *Balancer) DiscoverKeepServices(c *arvados.Client, okTypes []string) error {
+ bal.KeepServices = make(map[string]*KeepService)
+ ok := make(map[string]bool)
+ for _, t := range okTypes {
+ ok[t] = true
+ }
+ return c.EachKeepService(func(srv arvados.KeepService) error {
+ if ok[srv.ServiceType] {
+ bal.KeepServices[srv.UUID] = &KeepService{
+ KeepService: srv,
+ ChangeSet: &ChangeSet{},
+ }
+ } else {
+ bal.logf("skipping %v with service type %q", srv.UUID, srv.ServiceType)
+ }
+ return nil
+ })
+}
+
+// CheckSanityEarly checks for configuration and runtime errors that
+// can be detected before GetCurrentState() and ComputeChangeSets()
+// are called.
+//
+// If it returns an error, it is pointless to run GetCurrentState or
+// ComputeChangeSets: after doing so, the statistics would be
+// meaningless and it would be dangerous to run any Commit methods.
+func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
+ u, err := c.CurrentUser()
+ if err != nil {
+ return fmt.Errorf("CurrentUser(): %v", err)
+ }
+ if !u.IsActive || !u.IsAdmin {
+ return fmt.Errorf("current user (%s) is not an active admin user", u.UUID)
+ }
+ for _, srv := range bal.KeepServices {
+ if srv.ServiceType == "proxy" {
+ return fmt.Errorf("config error: %s: proxy servers cannot be balanced", srv)
+ }
+ }
+ return nil
+}
+
+// ClearTrashLists sends an empty trash list to each keep
+// service. Calling this before GetCurrentState avoids races.
+//
+// When a block appears in an index, we assume that replica will still
+// exist after we delete other replicas on other servers. However,
+// it's possible that a previous rebalancing operation made different
+// decisions (e.g., servers were added/removed, and rendezvous order
+// changed). In this case, the replica might already be on that
+// server's trash list, and it might be deleted before we send a
+// replacement trash list.
+//
+// We avoid this problem if we clear all trash lists before getting
+// indexes. (We also assume there is only one rebalancing process
+// running at a time.)
+func (bal *Balancer) ClearTrashLists(c *arvados.Client) error {
+ for _, srv := range bal.KeepServices {
+ srv.ChangeSet = &ChangeSet{}
+ }
+ return bal.CommitTrash(c)
+}
+
+// GetCurrentState determines the current replication state, and the
+// desired replication level, for every block that is either
+// retrievable or referenced.
+//
+// It determines the current replication state by reading the block index
+// from every known Keep service.
+//
+// It determines the desired replication level by retrieving all
+// collection manifests in the database (API server).
+//
+// It encodes the resulting information in BlockStateMap.
+func (bal *Balancer) GetCurrentState(c *arvados.Client) error {
+ defer timeMe(bal.Logger, "GetCurrentState")()
+ bal.BlockStateMap = NewBlockStateMap()
+
+ dd, err := c.DiscoveryDocument()
+ if err != nil {
+ return err
+ }
+ bal.DefaultReplication = dd.DefaultCollectionReplication
+ bal.MinMtime = time.Now().Unix() - dd.BlobSignatureTTL
+
+ errs := make(chan error, 2+len(bal.KeepServices))
+ wg := sync.WaitGroup{}
+
+ // Start one goroutine for each KeepService: retrieve the
+ // index, and add the returned blocks to BlockStateMap.
+ for _, srv := range bal.KeepServices {
+ wg.Add(1)
+ go func(srv *KeepService) {
+ defer wg.Done()
+ bal.logf("%s: retrieve index", srv)
+ idx, err := srv.Index(c, "")
+ if err != nil {
+ errs <- fmt.Errorf("%s: %v", srv, err)
+ return
+ }
+ bal.logf("%s: add %d replicas to map", srv, len(idx))
+ bal.BlockStateMap.AddReplicas(srv, idx)
+ bal.logf("%s: done", srv)
+ }(srv)
+ }
+
+ // collQ buffers incoming collections so we can start fetching
+ // the next page without waiting for the current page to
+ // finish processing. (1000 happens to match the page size
+ // used by (*arvados.Client)EachCollection(), but it's OK if
+ // they don't match.)
+ collQ := make(chan arvados.Collection, 1000)
+
+ // Start a goroutine to process collections. (We could use a
+ // worker pool here, but even with a single worker we already
+ // process collections much faster than we can retrieve them.)
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for coll := range collQ {
+ err := bal.addCollection(coll)
+ if err != nil {
+ errs <- err
+ for range collQ {
+ }
+ return
+ }
+ bal.collScanned++
+ }
+ }()
+
+ // Start a goroutine to retrieve all collections from the
+ // Arvados database and send them to collQ for processing.
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ err = EachCollection(c,
+ func(coll arvados.Collection) error {
+ collQ <- coll
+ if len(errs) > 0 {
+ // some other GetCurrentState
+ // error happened: no point
+ // getting any more
+ // collections.
+ return fmt.Errorf("")
+ }
+ return nil
+ }, func(done, total int) {
+ bal.logf("collections: %d/%d", done, total)
+ })
+ close(collQ)
+ if err != nil {
+ errs <- err
+ }
+ }()
+
+ go func() {
+ // Send a nil error when all goroutines finish. If
+ // this is the first error sent to errs, then
+ // everything worked.
+ wg.Wait()
+ errs <- nil
+ }()
+ return <-errs
+}
+
+func (bal *Balancer) addCollection(coll arvados.Collection) error {
+ blkids, err := coll.SizedDigests()
+ if err != nil {
+ bal.mutex.Lock()
+ bal.errors = append(bal.errors, fmt.Errorf("%v: %v", coll.UUID, err))
+ bal.mutex.Unlock()
+ return nil
+ }
+ repl := bal.DefaultReplication
+ if coll.ReplicationDesired != nil {
+ repl = *coll.ReplicationDesired
+ }
+ debugf("%v: %d block x%d", coll.UUID, len(blkids), repl)
+ bal.BlockStateMap.IncreaseDesired(repl, blkids)
+ return nil
+}
+
+// ComputeChangeSets compares, for each known block, the current and
+// desired replication states. If it is possible to get closer to the
+// desired state by copying or deleting blocks, it adds those changes
+// to the relevant KeepServices' ChangeSets.
+//
+// It does not actually apply any of the computed changes.
+func (bal *Balancer) ComputeChangeSets() {
+ // This just calls balanceBlock() once for each block, using a
+ // pool of worker goroutines.
+ defer timeMe(bal.Logger, "ComputeChangeSets")()
+ bal.setupServiceRoots()
+
+ type balanceTask struct {
+ blkid arvados.SizedDigest
+ blk *BlockState
+ }
+ nWorkers := 1 + runtime.NumCPU()
+ todo := make(chan balanceTask, nWorkers)
+ var wg sync.WaitGroup
+ for i := 0; i < nWorkers; i++ {
+ wg.Add(1)
+ go func() {
+ for work := range todo {
+ bal.balanceBlock(work.blkid, work.blk)
+ }
+ wg.Done()
+ }()
+ }
+ bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
+ todo <- balanceTask{
+ blkid: blkid,
+ blk: blk,
+ }
+ })
+ close(todo)
+ wg.Wait()
+}
+
+func (bal *Balancer) setupServiceRoots() {
+ bal.serviceRoots = make(map[string]string)
+ for _, srv := range bal.KeepServices {
+ bal.serviceRoots[srv.UUID] = srv.UUID
+ }
+}
+
+const (
+ changeStay = iota
+ changePull
+ changeTrash
+ changeNone
+)
+
+var changeName = map[int]string{
+ changeStay: "stay",
+ changePull: "pull",
+ changeTrash: "trash",
+ changeNone: "none",
+}
+
+// balanceBlock compares current state to desired state for a single
+// block, and makes the appropriate ChangeSet calls.
+func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) {
+ debugf("balanceBlock: %v %+v", blkid, blk)
+ uuids := keepclient.NewRootSorter(bal.serviceRoots, string(blkid[:32])).GetSortedRoots()
+ hasRepl := make(map[string]Replica, len(bal.serviceRoots))
+ for _, repl := range blk.Replicas {
+ hasRepl[repl.UUID] = repl
+ // TODO: when multiple copies are on one server, use
+ // the oldest one that doesn't have a timestamp
+ // collision with other replicas.
+ }
+ // number of replicas already found in positions better than
+ // the position we're contemplating now.
+ reportedBestRepl := 0
+ // To be safe we assume two replicas with the same Mtime are
+ // in fact the same replica being reported more than
+ // once. len(uniqueBestRepl) is the number of distinct
+ // replicas in the best rendezvous positions we've considered
+ // so far.
+ uniqueBestRepl := make(map[int64]bool, len(bal.serviceRoots))
+ // pulls is the number of Pull changes we have already
+ // requested. (For purposes of deciding whether to Pull to
+ // rendezvous position N, we should assume all pulls we have
+ // requested on rendezvous positions M<N will be successful.)
+ pulls := 0
+ var changes []string
+ for _, uuid := range uuids {
+ change := changeNone
+ srv := bal.KeepServices[uuid]
+ // TODO: request a Touch if Mtime is duplicated.
+ repl, ok := hasRepl[srv.UUID]
+ if ok {
+ // This service has a replica. We should
+ // delete it if [1] we already have enough
+ // distinct replicas in better rendezvous
+ // positions and [2] this replica's Mtime is
+ // distinct from all of the better replicas'
+ // Mtimes.
+ if !srv.ReadOnly &&
+ repl.Mtime < bal.MinMtime &&
+ len(uniqueBestRepl) >= blk.Desired &&
+ !uniqueBestRepl[repl.Mtime] {
+ srv.AddTrash(Trash{
+ SizedDigest: blkid,
+ Mtime: repl.Mtime,
+ })
+ change = changeTrash
+ } else {
+ change = changeStay
+ }
+ uniqueBestRepl[repl.Mtime] = true
+ reportedBestRepl++
+ } else if pulls+reportedBestRepl < blk.Desired &&
+ len(blk.Replicas) > 0 &&
+ !srv.ReadOnly {
+ // This service doesn't have a replica. We
+ // should pull one to this server if we don't
+ // already have enough (existing+requested)
+ // replicas in better rendezvous positions.
+ srv.AddPull(Pull{
+ SizedDigest: blkid,
+ Source: blk.Replicas[0].KeepService,
+ })
+ pulls++
+ change = changePull
+ }
+ if bal.Dumper != nil {
+ changes = append(changes, fmt.Sprintf("%s:%d=%s,%d", srv.ServiceHost, srv.ServicePort, changeName[change], repl.Mtime))
+ }
+ }
+ if bal.Dumper != nil {
+ bal.Dumper.Printf("%s have=%d want=%d %s", blkid, len(blk.Replicas), blk.Desired, strings.Join(changes, " "))
+ }
+}
+
+type blocksNBytes struct {
+ replicas int
+ blocks int
+ bytes int64
+}
+
+func (bb blocksNBytes) String() string {
+ return fmt.Sprintf("%d replicas (%d blocks, %d bytes)", bb.replicas, bb.blocks, bb.bytes)
+}
+
+type balancerStats struct {
+ lost, overrep, unref, garbage, underrep, justright blocksNBytes
+ desired, current blocksNBytes
+ pulls, trashes int
+}
+
+func (bal *Balancer) getStatistics() (s balancerStats) {
+ bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
+ surplus := len(blk.Replicas) - blk.Desired
+ bytes := blkid.Size()
+ switch {
+ case len(blk.Replicas) == 0 && blk.Desired > 0:
+ s.lost.replicas -= surplus
+ s.lost.blocks++
+ s.lost.bytes += bytes * int64(-surplus)
+ case len(blk.Replicas) < blk.Desired:
+ s.underrep.replicas -= surplus
+ s.underrep.blocks++
+ s.underrep.bytes += bytes * int64(-surplus)
+ case len(blk.Replicas) > 0 && blk.Desired == 0:
+ counter := &s.garbage
+ for _, r := range blk.Replicas {
+ if r.Mtime >= bal.MinMtime {
+ counter = &s.unref
+ break
+ }
+ }
+ counter.replicas += surplus
+ counter.blocks++
+ counter.bytes += bytes * int64(surplus)
+ case len(blk.Replicas) > blk.Desired:
+ s.overrep.replicas += surplus
+ s.overrep.blocks++
+ s.overrep.bytes += bytes * int64(len(blk.Replicas)-blk.Desired)
+ default:
+ s.justright.replicas += blk.Desired
+ s.justright.blocks++
+ s.justright.bytes += bytes * int64(blk.Desired)
+ }
+
+ if blk.Desired > 0 {
+ s.desired.replicas += blk.Desired
+ s.desired.blocks++
+ s.desired.bytes += bytes * int64(blk.Desired)
+ }
+ if len(blk.Replicas) > 0 {
+ s.current.replicas += len(blk.Replicas)
+ s.current.blocks++
+ s.current.bytes += bytes * int64(len(blk.Replicas))
+ }
+ })
+ for _, srv := range bal.KeepServices {
+ s.pulls += len(srv.ChangeSet.Pulls)
+ s.trashes += len(srv.ChangeSet.Trashes)
+ }
+ return
+}
+
+// PrintStatistics writes statistics about the computed changes to
+// bal.Logger. It should not be called until ComputeChangeSets has
+// finished.
+func (bal *Balancer) PrintStatistics() {
+ s := bal.getStatistics()
+ bal.logf("===")
+ bal.logf("%s lost (0=have<want)", s.lost)
+ bal.logf("%s underreplicated (0<have<want)", s.underrep)
+ bal.logf("%s just right (have=want)", s.justright)
+ bal.logf("%s overreplicated (have>want>0)", s.overrep)
+ bal.logf("%s unreferenced (have>want=0, new)", s.unref)
+ bal.logf("%s garbage (have>want=0, old)", s.garbage)
+ bal.logf("===")
+ bal.logf("%s total commitment (excluding unreferenced)", s.desired)
+ bal.logf("%s total usage", s.current)
+ bal.logf("===")
+ for _, srv := range bal.KeepServices {
+ bal.logf("%s: %v\n", srv, srv.ChangeSet)
+ }
+ bal.logf("===")
+}
+
+// CheckSanityLate checks for configuration and runtime errors after
+// GetCurrentState() and ComputeChangeSets() have finished.
+//
+// If it returns an error, it is dangerous to run any Commit methods.
+func (bal *Balancer) CheckSanityLate() error {
+ if bal.errors != nil {
+ for _, err := range bal.errors {
+ bal.logf("deferred error: %v", err)
+ }
+ return fmt.Errorf("cannot proceed safely after deferred errors")
+ }
+
+ if bal.collScanned == 0 {
+ return fmt.Errorf("received zero collections")
+ }
+
+ anyDesired := false
+ bal.BlockStateMap.Apply(func(_ arvados.SizedDigest, blk *BlockState) {
+ if blk.Desired > 0 {
+ anyDesired = true
+ }
+ })
+ if !anyDesired {
+ return fmt.Errorf("zero blocks have desired replication>0")
+ }
+
+ if dr := bal.DefaultReplication; dr < 1 {
+ return fmt.Errorf("Default replication (%d) is less than 1", dr)
+ }
+
+ // TODO: no two services have identical indexes
+ // TODO: no collisions (same md5, different size)
+ return nil
+}
+
+// CommitPulls sends the computed lists of pull requests to the
+// keepstore servers. This has the effect of increasing replication of
+// existing blocks that are either underreplicated or poorly
+// distributed according to rendezvous hashing.
+func (bal *Balancer) CommitPulls(c *arvados.Client) error {
+ return bal.commitAsync(c, "send pull list",
+ func(srv *KeepService) error {
+ return srv.CommitPulls(c)
+ })
+}
+
+// CommitTrash sends the computed lists of trash requests to the
+// keepstore servers. This has the effect of deleting blocks that are
+// overreplicated or unreferenced.
+func (bal *Balancer) CommitTrash(c *arvados.Client) error {
+ return bal.commitAsync(c, "send trash list",
+ func(srv *KeepService) error {
+ return srv.CommitTrash(c)
+ })
+}
+
+func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *KeepService) error) error {
+ errs := make(chan error)
+ for _, srv := range bal.KeepServices {
+ go func(srv *KeepService) {
+ var err error
+ defer func() { errs <- err }()
+ label := fmt.Sprintf("%s: %v", srv, label)
+ defer timeMe(bal.Logger, label)()
+ err = f(srv)
+ if err != nil {
+ err = fmt.Errorf("%s: %v", label, err)
+ }
+ }(srv)
+ }
+ var lastErr error
+ for _ = range bal.KeepServices {
+ if err := <-errs; err != nil {
+ bal.logf("%v", err)
+ lastErr = err
+ }
+ }
+ close(errs)
+ return lastErr
+}
+
+func (bal *Balancer) logf(f string, args ...interface{}) {
+ if bal.Logger != nil {
+ bal.Logger.Printf(f, args...)
+ }
+}
diff --git a/services/keep-balance/balance_run_test.go b/services/keep-balance/balance_run_test.go
new file mode 100644
index 0000000..a138d91
--- /dev/null
+++ b/services/keep-balance/balance_run_test.go
@@ -0,0 +1,374 @@
+package main
+
+import (
+ _ "encoding/json"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "log"
+ "net/http"
+ "net/http/httptest"
+ "strings"
+ "sync"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+
+ check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&runSuite{})
+
+type reqTracker struct {
+ reqs []http.Request
+ sync.Mutex
+}
+
+func (rt *reqTracker) Count() int {
+ rt.Lock()
+ defer rt.Unlock()
+ return len(rt.reqs)
+}
+
+func (rt *reqTracker) Add(req *http.Request) int {
+ rt.Lock()
+ defer rt.Unlock()
+ rt.reqs = append(rt.reqs, *req)
+ return len(rt.reqs)
+}
+
+// stubServer is an HTTP transport that intercepts and processes all
+// requests using its own handlers.
+type stubServer struct {
+ mux *http.ServeMux
+ srv *httptest.Server
+ mutex sync.Mutex
+ Requests reqTracker
+ logf func(string, ...interface{})
+}
+
+// Start initializes the stub server and returns an *http.Client that
+// uses the stub server to handle all requests.
+//
+// A stubServer that has been started should eventually be shut down
+// with Close().
+func (s *stubServer) Start() *http.Client {
+ // Set up a config.Client that forwards all requests to s.mux
+ // via s.srv. Test cases will attach handlers to s.mux to get
+ // the desired responses.
+ s.mux = http.NewServeMux()
+ s.srv = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ s.mutex.Lock()
+ s.Requests.Add(r)
+ s.mutex.Unlock()
+ w.Header().Set("Content-Type", "application/json")
+ s.mux.ServeHTTP(w, r)
+ }))
+ return &http.Client{Transport: s}
+}
+
+func (s *stubServer) RoundTrip(req *http.Request) (*http.Response, error) {
+ w := httptest.NewRecorder()
+ s.mux.ServeHTTP(w, req)
+ return &http.Response{
+ StatusCode: w.Code,
+ Status: fmt.Sprintf("%d %s", w.Code, http.StatusText(w.Code)),
+ Header: w.HeaderMap,
+ Body: ioutil.NopCloser(w.Body)}, nil
+}
+
+// Close releases resources used by the server.
+func (s *stubServer) Close() {
+ s.srv.Close()
+}
+
+func (s *stubServer) serveStatic(path, data string) *reqTracker {
+ rt := &reqTracker{}
+ s.mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
+ rt.Add(r)
+ if r.Body != nil {
+ ioutil.ReadAll(r.Body)
+ r.Body.Close()
+ }
+ io.WriteString(w, data)
+ })
+ return rt
+}
+
+func (s *stubServer) serveCurrentUserAdmin() *reqTracker {
+ return s.serveStatic("/arvados/v1/users/current",
+ `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":true,"is_active":true}`)
+}
+
+func (s *stubServer) serveCurrentUserNotAdmin() *reqTracker {
+ return s.serveStatic("/arvados/v1/users/current",
+ `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":false,"is_active":true}`)
+}
+
+func (s *stubServer) serveDiscoveryDoc() *reqTracker {
+ return s.serveStatic("/discovery/v1/apis/arvados/v1/rest",
+ `{"defaultCollectionReplication":2}`)
+}
+
+func (s *stubServer) serveZeroCollections() *reqTracker {
+ return s.serveStatic("/arvados/v1/collections",
+ `{"items":[],"items_available":0}`)
+}
+
+func (s *stubServer) serveFooBarFileCollections() *reqTracker {
+ rt := &reqTracker{}
+ s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
+ r.ParseForm()
+ rt.Add(r)
+ if strings.Contains(r.Form.Get("filters"), `modified_at`) {
+ io.WriteString(w, `{"items_available":0,"items":[]}`)
+ } else {
+ io.WriteString(w, `{"items_available":2,"items":[
+ {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
+ {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
+ }
+ })
+ return rt
+}
+
+func (s *stubServer) serveCollectionsButSkipOne() *reqTracker {
+ rt := &reqTracker{}
+ s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
+ r.ParseForm()
+ rt.Add(r)
+ if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003c="`) {
+ io.WriteString(w, `{"items_available":3,"items":[]}`)
+ } else if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003e="`) {
+ io.WriteString(w, `{"items_available":0,"items":[]}`)
+ } else {
+ io.WriteString(w, `{"items_available":2,"items":[
+ {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
+ {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
+ }
+ })
+ return rt
+}
+
+func (s *stubServer) serveZeroKeepServices() *reqTracker {
+ return s.serveStatic("/arvados/v1/keep_services",
+ `{"items":[],"items_available":0}`)
+}
+
+func (s *stubServer) serveFourDiskKeepServices() *reqTracker {
+ return s.serveStatic("/arvados/v1/keep_services", `{"items_available":5,"items":[
+ {"uuid":"zzzzz-bi6l4-000000000000000","service_host":"keep0.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
+ {"uuid":"zzzzz-bi6l4-000000000000001","service_host":"keep1.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
+ {"uuid":"zzzzz-bi6l4-000000000000002","service_host":"keep2.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
+ {"uuid":"zzzzz-bi6l4-000000000000003","service_host":"keep3.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
+ {"uuid":"zzzzz-bi6l4-h0a0xwut9qa6g3a","service_host":"keep.zzzzz.arvadosapi.com","service_port":25333,"service_ssl_flag":true,"service_type":"proxy"}]}`)
+}
+
+func (s *stubServer) serveKeepstoreIndexFoo4Bar1() *reqTracker {
+ rt := &reqTracker{}
+ s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
+ count := rt.Add(r)
+ if r.Host == "keep0.zzzzz.arvadosapi.com:25107" {
+ io.WriteString(w, "37b51d194a7513e45b56f6524f2d51f2+3 12345678\n")
+ }
+ fmt.Fprintf(w, "acbd18db4cc2f85cedef654fccc4a4d8+3 %d\n\n", 12345678+count)
+ })
+ return rt
+}
+
+func (s *stubServer) serveKeepstoreTrash() *reqTracker {
+ return s.serveStatic("/trash", `{}`)
+}
+
+func (s *stubServer) serveKeepstorePull() *reqTracker {
+ return s.serveStatic("/pull", `{}`)
+}
+
+type runSuite struct {
+ stub stubServer
+ config Config
+}
+
+// make a log.Logger that writes to the current test's c.Log().
+func (s *runSuite) logger(c *check.C) *log.Logger {
+ r, w := io.Pipe()
+ go func() {
+ buf := make([]byte, 10000)
+ for {
+ n, err := r.Read(buf)
+ if n > 0 {
+ if buf[n-1] == '\n' {
+ n--
+ }
+ c.Log(string(buf[:n]))
+ }
+ if err != nil {
+ break
+ }
+ }
+ }()
+ return log.New(w, "", log.LstdFlags)
+}
+
+func (s *runSuite) SetUpTest(c *check.C) {
+ s.config = Config{
+ Client: arvados.Client{
+ AuthToken: "xyzzy",
+ APIHost: "zzzzz.arvadosapi.com",
+ Client: s.stub.Start()},
+ KeepServiceTypes: []string{"disk"}}
+ s.stub.serveDiscoveryDoc()
+ s.stub.logf = c.Logf
+}
+
+func (s *runSuite) TearDownTest(c *check.C) {
+ s.stub.Close()
+}
+
+func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
+ opts := RunOptions{
+ CommitPulls: true,
+ CommitTrash: true,
+ Logger: s.logger(c),
+ }
+ s.stub.serveCurrentUserAdmin()
+ s.stub.serveZeroCollections()
+ s.stub.serveFourDiskKeepServices()
+ s.stub.serveKeepstoreIndexFoo4Bar1()
+ trashReqs := s.stub.serveKeepstoreTrash()
+ pullReqs := s.stub.serveKeepstorePull()
+ err := (&Balancer{}).Run(s.config, opts)
+ c.Check(err, check.ErrorMatches, "received zero collections")
+ c.Check(trashReqs.Count(), check.Equals, 4)
+ c.Check(pullReqs.Count(), check.Equals, 0)
+}
+
+func (s *runSuite) TestServiceTypes(c *check.C) {
+ opts := RunOptions{
+ CommitPulls: true,
+ CommitTrash: true,
+ Logger: s.logger(c),
+ }
+ s.config.KeepServiceTypes = []string{"unlisted-type"}
+ s.stub.serveCurrentUserAdmin()
+ s.stub.serveFooBarFileCollections()
+ s.stub.serveFourDiskKeepServices()
+ indexReqs := s.stub.serveKeepstoreIndexFoo4Bar1()
+ trashReqs := s.stub.serveKeepstoreTrash()
+ err := (&Balancer{}).Run(s.config, opts)
+ c.Check(err, check.IsNil)
+ c.Check(indexReqs.Count(), check.Equals, 0)
+ c.Check(trashReqs.Count(), check.Equals, 0)
+}
+
+func (s *runSuite) TestRefuseNonAdmin(c *check.C) {
+ opts := RunOptions{
+ CommitPulls: true,
+ CommitTrash: true,
+ Logger: s.logger(c),
+ }
+ s.stub.serveCurrentUserNotAdmin()
+ s.stub.serveZeroCollections()
+ s.stub.serveFourDiskKeepServices()
+ trashReqs := s.stub.serveKeepstoreTrash()
+ pullReqs := s.stub.serveKeepstorePull()
+ err := (&Balancer{}).Run(s.config, opts)
+ c.Check(err, check.ErrorMatches, "current user .* is not .* admin user")
+ c.Check(trashReqs.Count(), check.Equals, 0)
+ c.Check(pullReqs.Count(), check.Equals, 0)
+}
+
+func (s *runSuite) TestDetectSkippedCollections(c *check.C) {
+ opts := RunOptions{
+ CommitPulls: true,
+ CommitTrash: true,
+ Logger: s.logger(c),
+ }
+ s.stub.serveCurrentUserAdmin()
+ s.stub.serveCollectionsButSkipOne()
+ s.stub.serveFourDiskKeepServices()
+ s.stub.serveKeepstoreIndexFoo4Bar1()
+ trashReqs := s.stub.serveKeepstoreTrash()
+ pullReqs := s.stub.serveKeepstorePull()
+ err := (&Balancer{}).Run(s.config, opts)
+ c.Check(err, check.ErrorMatches, `Retrieved 2 collections with modtime <= .* but server now reports there are 3 collections.*`)
+ c.Check(trashReqs.Count(), check.Equals, 4)
+ c.Check(pullReqs.Count(), check.Equals, 0)
+}
+
+func (s *runSuite) TestDryRun(c *check.C) {
+ opts := RunOptions{
+ CommitPulls: false,
+ CommitTrash: false,
+ Logger: s.logger(c),
+ }
+ s.stub.serveCurrentUserAdmin()
+ s.stub.serveFooBarFileCollections()
+ s.stub.serveFourDiskKeepServices()
+ s.stub.serveKeepstoreIndexFoo4Bar1()
+ trashReqs := s.stub.serveKeepstoreTrash()
+ pullReqs := s.stub.serveKeepstorePull()
+ var bal Balancer
+ err := bal.Run(s.config, opts)
+ c.Check(err, check.IsNil)
+ c.Check(trashReqs.Count(), check.Equals, 0)
+ c.Check(pullReqs.Count(), check.Equals, 0)
+ stats := bal.getStatistics()
+ c.Check(stats.pulls, check.Not(check.Equals), 0)
+ c.Check(stats.underrep.replicas, check.Not(check.Equals), 0)
+ c.Check(stats.overrep.replicas, check.Not(check.Equals), 0)
+}
+
+func (s *runSuite) TestCommit(c *check.C) {
+ opts := RunOptions{
+ CommitPulls: true,
+ CommitTrash: true,
+ Logger: s.logger(c),
+ Dumper: s.logger(c),
+ }
+ s.stub.serveCurrentUserAdmin()
+ s.stub.serveFooBarFileCollections()
+ s.stub.serveFourDiskKeepServices()
+ s.stub.serveKeepstoreIndexFoo4Bar1()
+ trashReqs := s.stub.serveKeepstoreTrash()
+ pullReqs := s.stub.serveKeepstorePull()
+ var bal Balancer
+ err := bal.Run(s.config, opts)
+ c.Check(err, check.IsNil)
+ c.Check(trashReqs.Count(), check.Equals, 8)
+ c.Check(pullReqs.Count(), check.Equals, 4)
+ stats := bal.getStatistics()
+ // "foo" block is overreplicated by 2
+ c.Check(stats.trashes, check.Equals, 2)
+ // "bar" block is underreplicated by 1, and its only copy is
+ // in a poor rendezvous position
+ c.Check(stats.pulls, check.Equals, 2)
+}
+
+func (s *runSuite) TestRunForever(c *check.C) {
+ opts := RunOptions{
+ CommitPulls: true,
+ CommitTrash: true,
+ Logger: s.logger(c),
+ Dumper: s.logger(c),
+ }
+ s.stub.serveCurrentUserAdmin()
+ s.stub.serveFooBarFileCollections()
+ s.stub.serveFourDiskKeepServices()
+ s.stub.serveKeepstoreIndexFoo4Bar1()
+ trashReqs := s.stub.serveKeepstoreTrash()
+ pullReqs := s.stub.serveKeepstorePull()
+
+ stop := make(chan interface{})
+ s.config.RunPeriod = arvados.Duration(time.Millisecond)
+ go RunForever(s.config, opts, stop)
+
+ // Each run should send 4 clear trash lists + 4 pull lists + 4
+ // trash lists. We should complete four runs in much less than
+ // a second.
+ for t0 := time.Now(); pullReqs.Count() < 16 && time.Since(t0) < 10*time.Second; {
+ time.Sleep(time.Millisecond)
+ }
+ stop <- true
+ c.Check(pullReqs.Count() >= 16, check.Equals, true)
+ c.Check(trashReqs.Count(), check.Equals, 2*pullReqs.Count())
+}
diff --git a/services/keep-balance/balance_test.go b/services/keep-balance/balance_test.go
new file mode 100644
index 0000000..682a5fb
--- /dev/null
+++ b/services/keep-balance/balance_test.go
@@ -0,0 +1,255 @@
+package main
+
+import (
+ "crypto/md5"
+ "fmt"
+ "sort"
+ "strconv"
+ "testing"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+
+ check "gopkg.in/check.v1"
+)
+
+// Test with Gocheck
+func Test(t *testing.T) {
+ check.TestingT(t)
+}
+
+var _ = check.Suite(&balancerSuite{})
+
+type balancerSuite struct {
+ Balancer
+ srvs []*KeepService
+ blks map[string]tester
+ knownRendezvous [][]int
+ signatureTTL int64
+}
+
+const (
+ // index into knownRendezvous
+ known0 = 0
+)
+
+type slots []int
+
+type tester struct {
+ known int
+ desired int
+ current slots
+ timestamps []int64
+ shouldPull slots
+ shouldTrash slots
+}
+
+func (bal *balancerSuite) SetUpSuite(c *check.C) {
+ bal.knownRendezvous = nil
+ for _, str := range []string{
+ "3eab2d5fc9681074",
+ "097dba52e648f1c3",
+ "c5b4e023f8a7d691",
+ "9d81c02e76a3bf54",
+ } {
+ var slots []int
+ for _, c := range []byte(str) {
+ pos, _ := strconv.ParseUint(string(c), 16, 4)
+ slots = append(slots, int(pos))
+ }
+ bal.knownRendezvous = append(bal.knownRendezvous, slots)
+ }
+
+ bal.signatureTTL = 3600
+}
+
+func (bal *balancerSuite) SetUpTest(c *check.C) {
+ bal.srvs = make([]*KeepService, 16)
+ bal.KeepServices = make(map[string]*KeepService)
+ for i := range bal.srvs {
+ srv := &KeepService{
+ KeepService: arvados.KeepService{
+ UUID: fmt.Sprintf("zzzzz-bi6l4-%015x", i),
+ },
+ }
+ bal.srvs[i] = srv
+ bal.KeepServices[srv.UUID] = srv
+ }
+
+ bal.MinMtime = time.Now().Unix() - bal.signatureTTL
+}
+
+func (bal *balancerSuite) TestPerfect(c *check.C) {
+ bal.try(c, tester{
+ desired: 2,
+ current: slots{0, 1},
+ shouldPull: nil,
+ shouldTrash: nil})
+}
+
+func (bal *balancerSuite) TestDecreaseRepl(c *check.C) {
+ bal.try(c, tester{
+ desired: 2,
+ current: slots{0, 2, 1},
+ shouldTrash: slots{2}})
+}
+
+func (bal *balancerSuite) TestDecreaseReplToZero(c *check.C) {
+ bal.try(c, tester{
+ desired: 0,
+ current: slots{0, 1, 3},
+ shouldTrash: slots{0, 1, 3}})
+}
+
+func (bal *balancerSuite) TestIncreaseRepl(c *check.C) {
+ bal.try(c, tester{
+ desired: 4,
+ current: slots{0, 1},
+ shouldPull: slots{2, 3}})
+}
+
+func (bal *balancerSuite) TestSkipReadonly(c *check.C) {
+ bal.srvList(0, slots{3})[0].ReadOnly = true
+ bal.try(c, tester{
+ desired: 4,
+ current: slots{0, 1},
+ shouldPull: slots{2, 4}})
+}
+
+func (bal *balancerSuite) TestFixUnbalanced(c *check.C) {
+ bal.try(c, tester{
+ desired: 2,
+ current: slots{2, 0},
+ shouldPull: slots{1}})
+ bal.try(c, tester{
+ desired: 2,
+ current: slots{2, 7},
+ shouldPull: slots{0, 1}})
+ // if only one of the pulls succeeds, we'll see this next:
+ bal.try(c, tester{
+ desired: 2,
+ current: slots{2, 1, 7},
+ shouldPull: slots{0},
+ shouldTrash: slots{7}})
+ // if both pulls succeed, we'll see this next:
+ bal.try(c, tester{
+ desired: 2,
+ current: slots{2, 0, 1, 7},
+ shouldTrash: slots{2, 7}})
+
+ // unbalanced + excessive replication => pull + trash
+ bal.try(c, tester{
+ desired: 2,
+ current: slots{2, 5, 7},
+ shouldPull: slots{0, 1},
+ shouldTrash: slots{7}})
+}
+
+func (bal *balancerSuite) TestIncreaseReplTimestampCollision(c *check.C) {
+ // For purposes of increasing replication, we assume identical
+ // replicas are distinct.
+ bal.try(c, tester{
+ desired: 4,
+ current: slots{0, 1},
+ timestamps: []int64{12345678, 12345678},
+ shouldPull: slots{2, 3}})
+}
+
+func (bal *balancerSuite) TestDecreaseReplTimestampCollision(c *check.C) {
+ // For purposes of decreasing replication, we assume identical
+ // replicas are NOT distinct.
+ bal.try(c, tester{
+ desired: 2,
+ current: slots{0, 1, 2},
+ timestamps: []int64{12345678, 12345678, 12345678}})
+ bal.try(c, tester{
+ desired: 2,
+ current: slots{0, 1, 2},
+ timestamps: []int64{12345678, 10000000, 10000000}})
+}
+
+func (bal *balancerSuite) TestDecreaseReplBlockTooNew(c *check.C) {
+ oldTime := bal.MinMtime - 3600
+ newTime := bal.MinMtime + 3600
+ // The excess replica is too new to delete.
+ bal.try(c, tester{
+ desired: 2,
+ current: slots{0, 1, 2},
+ timestamps: []int64{oldTime, newTime, newTime + 1}})
+ // The best replicas are too new to delete, but the excess
+ // replica is old enough.
+ bal.try(c, tester{
+ desired: 2,
+ current: slots{0, 1, 2},
+ timestamps: []int64{newTime, newTime + 1, oldTime},
+ shouldTrash: slots{2}})
+}
+
+// Clear all servers' changesets, balance a single block, and verify
+// the appropriate changes for that block have been added to the
+// changesets.
+func (bal *balancerSuite) try(c *check.C, t tester) {
+ bal.setupServiceRoots()
+ blk := &BlockState{
+ Desired: t.desired,
+ Replicas: bal.replList(t.known, t.current)}
+ for i, t := range t.timestamps {
+ blk.Replicas[i].Mtime = t
+ }
+ for _, srv := range bal.srvs {
+ srv.ChangeSet = &ChangeSet{}
+ }
+ bal.balanceBlock(knownBlkid(t.known), blk)
+
+ var didPull, didTrash slots
+ for i, srv := range bal.srvs {
+ var slot int
+ for probeOrder, srvNum := range bal.knownRendezvous[t.known] {
+ if srvNum == i {
+ slot = probeOrder
+ }
+ }
+ for _, pull := range srv.Pulls {
+ didPull = append(didPull, slot)
+ c.Check(pull.SizedDigest, check.Equals, knownBlkid(t.known))
+ }
+ for _, trash := range srv.Trashes {
+ didTrash = append(didTrash, slot)
+ c.Check(trash.SizedDigest, check.Equals, knownBlkid(t.known))
+ }
+ }
+
+ for _, list := range []slots{didPull, didTrash, t.shouldPull, t.shouldTrash} {
+ sort.Sort(sort.IntSlice(list))
+ }
+ c.Check(didPull, check.DeepEquals, t.shouldPull)
+ c.Check(didTrash, check.DeepEquals, t.shouldTrash)
+}
+
+// srvList returns the KeepServices, sorted in rendezvous order and
+// then selected by idx. For example, srvList(3, 0, 1, 4) returns the
+// the first-, second-, and fifth-best servers for storing
+// bal.knownBlkid(3).
+func (bal *balancerSuite) srvList(knownBlockID int, order slots) (srvs []*KeepService) {
+ for _, i := range order {
+ srvs = append(srvs, bal.srvs[bal.knownRendezvous[knownBlockID][i]])
+ }
+ return
+}
+
+// replList is like srvList but returns an "existing replicas" slice,
+// suitable for a BlockState test fixture.
+func (bal *balancerSuite) replList(knownBlockID int, order slots) (repls []Replica) {
+ mtime := time.Now().Unix() - bal.signatureTTL - 86400
+ for _, srv := range bal.srvList(knownBlockID, order) {
+ repls = append(repls, Replica{srv, mtime})
+ mtime++
+ }
+ return
+}
+
+// generate the same data hashes that are tested in
+// sdk/go/keepclient/root_sorter_test.go
+func knownBlkid(i int) arvados.SizedDigest {
+ return arvados.SizedDigest(fmt.Sprintf("%x+64", md5.Sum([]byte(fmt.Sprintf("%064x", i)))))
+}
diff --git a/services/keep-balance/block_state.go b/services/keep-balance/block_state.go
new file mode 100644
index 0000000..d607386
--- /dev/null
+++ b/services/keep-balance/block_state.go
@@ -0,0 +1,95 @@
+package main
+
+import (
+ "sync"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+// Replica is a file on disk (or object in an S3 bucket, or blob in an
+// Azure storage container, etc.) as reported in a keepstore index
+// response.
+type Replica struct {
+ *KeepService
+ Mtime int64
+}
+
+// BlockState indicates the number of desired replicas (according to
+// the collections we know about) and the replicas actually stored
+// (according to the keepstore indexes we know about).
+type BlockState struct {
+ Replicas []Replica
+ Desired int
+}
+
+func (bs *BlockState) addReplica(r Replica) {
+ bs.Replicas = append(bs.Replicas, r)
+}
+
+func (bs *BlockState) increaseDesired(n int) {
+ if bs.Desired < n {
+ bs.Desired = n
+ }
+}
+
+// BlockStateMap is a goroutine-safe wrapper around a
+// map[arvados.SizedDigest]*BlockState.
+type BlockStateMap struct {
+ entries map[arvados.SizedDigest]*BlockState
+ mutex sync.Mutex
+}
+
+// NewBlockStateMap returns a newly allocated BlockStateMap.
+func NewBlockStateMap() *BlockStateMap {
+ return &BlockStateMap{
+ entries: make(map[arvados.SizedDigest]*BlockState),
+ }
+}
+
+// return a BlockState entry, allocating a new one if needed. (Private
+// method: not goroutine-safe.)
+func (bsm *BlockStateMap) get(blkid arvados.SizedDigest) *BlockState {
+ // TODO? Allocate BlockState structs a slice at a time,
+ // instead of one at a time.
+ blk := bsm.entries[blkid]
+ if blk == nil {
+ blk = &BlockState{}
+ bsm.entries[blkid] = blk
+ }
+ return blk
+}
+
+// Apply runs f on each entry in the map.
+func (bsm *BlockStateMap) Apply(f func(arvados.SizedDigest, *BlockState)) {
+ bsm.mutex.Lock()
+ defer bsm.mutex.Unlock()
+
+ for blkid, blk := range bsm.entries {
+ f(blkid, blk)
+ }
+}
+
+// AddReplicas updates the map to indicate srv has a replica of each
+// block in idx.
+func (bsm *BlockStateMap) AddReplicas(srv *KeepService, idx []arvados.KeepServiceIndexEntry) {
+ bsm.mutex.Lock()
+ defer bsm.mutex.Unlock()
+
+ for _, ent := range idx {
+ bsm.get(ent.SizedDigest).addReplica(Replica{
+ KeepService: srv,
+ Mtime: ent.Mtime,
+ })
+ }
+}
+
+// IncreaseDesired updates the map to indicate the desired replication
+// for the given blocks is at least n.
+func (bsm *BlockStateMap) IncreaseDesired(n int, blocks []arvados.SizedDigest) {
+ bsm.mutex.Lock()
+ defer bsm.mutex.Unlock()
+
+ for _, blkid := range blocks {
+ bsm.get(blkid).increaseDesired(n)
+ }
+}
diff --git a/services/keep-balance/change_set.go b/services/keep-balance/change_set.go
new file mode 100644
index 0000000..417ea7f
--- /dev/null
+++ b/services/keep-balance/change_set.go
@@ -0,0 +1,75 @@
+package main
+
+import (
+ "encoding/json"
+ "fmt"
+ "sync"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+// Pull is a request to retrieve a block from a remote server, and
+// store it locally.
+type Pull struct {
+ arvados.SizedDigest
+ Source *KeepService
+}
+
+// MarshalJSON formats a pull request the way keepstore wants to see
+// it.
+func (p Pull) MarshalJSON() ([]byte, error) {
+ type KeepstorePullRequest struct {
+ Locator string `json:"locator"`
+ Servers []string `json:"servers"`
+ }
+ return json.Marshal(KeepstorePullRequest{
+ Locator: string(p.SizedDigest[:32]),
+ Servers: []string{p.Source.URLBase()}})
+}
+
+// Trash is a request to delete a block.
+type Trash struct {
+ arvados.SizedDigest
+ Mtime int64
+}
+
+// MarshalJSON formats a trash request the way keepstore wants to see
+// it, i.e., as a bare locator with no +size hint.
+func (t Trash) MarshalJSON() ([]byte, error) {
+ type KeepstoreTrashRequest struct {
+ Locator string `json:"locator"`
+ BlockMtime int64 `json:"block_mtime"`
+ }
+ return json.Marshal(KeepstoreTrashRequest{
+ Locator: string(t.SizedDigest[:32]),
+ BlockMtime: t.Mtime})
+}
+
+// ChangeSet is a set of change requests that will be sent to a
+// keepstore server.
+type ChangeSet struct {
+ Pulls []Pull
+ Trashes []Trash
+ mutex sync.Mutex
+}
+
+// AddPull adds a Pull operation.
+func (cs *ChangeSet) AddPull(p Pull) {
+ cs.mutex.Lock()
+ cs.Pulls = append(cs.Pulls, p)
+ cs.mutex.Unlock()
+}
+
+// AddTrash adds a Trash operation
+func (cs *ChangeSet) AddTrash(t Trash) {
+ cs.mutex.Lock()
+ cs.Trashes = append(cs.Trashes, t)
+ cs.mutex.Unlock()
+}
+
+// String implements fmt.Stringer.
+func (cs *ChangeSet) String() string {
+ cs.mutex.Lock()
+ defer cs.mutex.Unlock()
+ return fmt.Sprintf("ChangeSet{Pulls:%d, Trashes:%d}", len(cs.Pulls), len(cs.Trashes))
+}
diff --git a/services/keep-balance/change_set_test.go b/services/keep-balance/change_set_test.go
new file mode 100644
index 0000000..b5dcb5c
--- /dev/null
+++ b/services/keep-balance/change_set_test.go
@@ -0,0 +1,35 @@
+package main
+
+import (
+ "encoding/json"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+
+ check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&changeSetSuite{})
+
+type changeSetSuite struct{}
+
+func (s *changeSetSuite) TestJSONFormat(c *check.C) {
+ srv := &KeepService{
+ KeepService: arvados.KeepService{
+ UUID: "zzzzz-bi6l4-000000000000001",
+ ServiceType: "disk",
+ ServiceSSLFlag: false,
+ ServiceHost: "keep1.zzzzz.arvadosapi.com",
+ ServicePort: 25107}}
+
+ buf, err := json.Marshal([]Pull{{
+ SizedDigest: arvados.SizedDigest("acbd18db4cc2f85cedef654fccc4a4d8+3"),
+ Source: srv}})
+ c.Check(err, check.IsNil)
+ c.Check(string(buf), check.Equals, `[{"locator":"acbd18db4cc2f85cedef654fccc4a4d8","servers":["http://keep1.zzzzz.arvadosapi.com:25107"]}]`)
+
+ buf, err = json.Marshal([]Trash{{
+ SizedDigest: arvados.SizedDigest("acbd18db4cc2f85cedef654fccc4a4d8+3"),
+ Mtime: 123456789}})
+ c.Check(err, check.IsNil)
+ c.Check(string(buf), check.Equals, `[{"locator":"acbd18db4cc2f85cedef654fccc4a4d8","block_mtime":123456789}]`)
+}
diff --git a/services/keep-balance/collection.go b/services/keep-balance/collection.go
new file mode 100644
index 0000000..e6a1f08
--- /dev/null
+++ b/services/keep-balance/collection.go
@@ -0,0 +1,95 @@
+package main
+
+import (
+ "fmt"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+func countCollections(c *arvados.Client, params arvados.ResourceListParams) (int, error) {
+ var page arvados.CollectionList
+ var zero int
+ params.Limit = &zero
+ err := c.RequestAndDecode(&page, "GET", "arvados/v1/collections", nil, params)
+ return page.ItemsAvailable, err
+}
+
+// EachCollection calls f once for every readable
+// collection. EachCollection stops if it encounters an error, such as
+// f returning a non-nil error.
+//
+// The progress function is called periodically with done (number of
+// times f has been called) and total (number of times f is expected
+// to be called).
+func EachCollection(c *arvados.Client, f func(arvados.Collection) error, progress func(done, total int)) error {
+ if progress == nil {
+ progress = func(_, _ int) {}
+ }
+
+ expectCount, err := countCollections(c, arvados.ResourceListParams{})
+ if err != nil {
+ return err
+ }
+
+ limit := 1000
+ params := arvados.ResourceListParams{
+ Limit: &limit,
+ Order: "modified_at, uuid",
+ Select: []string{"uuid", "manifest_text", "modified_at", "portable_data_hash", "replication_desired"},
+ }
+ var last arvados.Collection
+ var filterTime time.Time
+ callCount := 0
+ for {
+ progress(callCount, expectCount)
+ var page arvados.CollectionList
+ err := c.RequestAndDecode(&page, "GET", "arvados/v1/collections", nil, params)
+ if err != nil {
+ return err
+ }
+ for _, coll := range page.Items {
+ if last.ModifiedAt != nil && *last.ModifiedAt == *coll.ModifiedAt && last.UUID >= coll.UUID {
+ continue
+ }
+ callCount++
+ err = f(coll)
+ if err != nil {
+ return err
+ }
+ last = coll
+ }
+ if last.ModifiedAt == nil || *last.ModifiedAt == filterTime {
+ if page.ItemsAvailable > len(page.Items) {
+ // TODO: use "mtime=X && UUID>Y"
+ // filters to get all collections with
+ // this timestamp, then use "mtime>X"
+ // to get the next timestamp.
+ return fmt.Errorf("BUG: Received an entire page with the same modified_at timestamp (%v), cannot make progress", filterTime)
+ }
+ break
+ }
+ filterTime = *last.ModifiedAt
+ params.Filters = []arvados.Filter{{
+ Attr: "modified_at",
+ Operator: ">=",
+ Operand: filterTime,
+ }, {
+ Attr: "uuid",
+ Operator: "!=",
+ Operand: last.UUID,
+ }}
+ }
+ progress(callCount, expectCount)
+
+ if checkCount, err := countCollections(c, arvados.ResourceListParams{Filters: []arvados.Filter{{
+ Attr: "modified_at",
+ Operator: "<=",
+ Operand: filterTime}}}); err != nil {
+ return err
+ } else if callCount < checkCount {
+ return fmt.Errorf("Retrieved %d collections with modtime <= T=%q, but server now reports there are %d collections with modtime <= T", callCount, filterTime, checkCount)
+ }
+
+ return nil
+}
diff --git a/services/keep-balance/integration_test.go b/services/keep-balance/integration_test.go
new file mode 100644
index 0000000..b090614
--- /dev/null
+++ b/services/keep-balance/integration_test.go
@@ -0,0 +1,92 @@
+package main
+
+import (
+ "bytes"
+ "log"
+ "net/http"
+ "os"
+ "strings"
+ "testing"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+
+ check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&integrationSuite{})
+
+type integrationSuite struct {
+ config Config
+ keepClient *keepclient.KeepClient
+}
+
+func (s *integrationSuite) SetUpSuite(c *check.C) {
+ if testing.Short() {
+ c.Skip("-short")
+ }
+ arvadostest.ResetEnv()
+ arvadostest.StartAPI()
+ arvadostest.StartKeep(4, true)
+
+ arv, err := arvadosclient.MakeArvadosClient()
+ arv.ApiToken = arvadostest.DataManagerToken
+ c.Assert(err, check.IsNil)
+ s.keepClient = &keepclient.KeepClient{
+ Arvados: &arv,
+ Client: &http.Client{},
+ }
+ c.Assert(s.keepClient.DiscoverKeepServers(), check.IsNil)
+ s.putReplicas(c, "foo", 4)
+ s.putReplicas(c, "bar", 1)
+}
+
+func (s *integrationSuite) putReplicas(c *check.C, data string, replicas int) {
+ s.keepClient.Want_replicas = replicas
+ _, _, err := s.keepClient.PutB([]byte(data))
+ c.Assert(err, check.IsNil)
+}
+
+func (s *integrationSuite) TearDownSuite(c *check.C) {
+ if testing.Short() {
+ c.Skip("-short")
+ }
+ arvadostest.StopKeep(4)
+ arvadostest.StopAPI()
+}
+
+func (s *integrationSuite) SetUpTest(c *check.C) {
+ s.config = Config{
+ Client: arvados.Client{
+ APIHost: os.Getenv("ARVADOS_API_HOST"),
+ AuthToken: arvadostest.DataManagerToken,
+ Insecure: true,
+ },
+ KeepServiceTypes: []string{"disk"},
+ }
+}
+
+func (s *integrationSuite) TestBalanceAPIFixtures(c *check.C) {
+ var logBuf *bytes.Buffer
+ for iter := 0; iter < 20; iter++ {
+ logBuf := &bytes.Buffer{}
+ opts := RunOptions{
+ CommitPulls: true,
+ CommitTrash: true,
+ Logger: log.New(logBuf, "", log.LstdFlags),
+ }
+ err := (&Balancer{}).Run(s.config, opts)
+ c.Check(err, check.IsNil)
+ if iter == 0 {
+ c.Check(logBuf.String(), check.Matches, `(?ms).*ChangeSet{Pulls:1.*`)
+ c.Check(logBuf.String(), check.Not(check.Matches), `(?ms).*ChangeSet{.*Trashes:[^0]}*`)
+ } else if strings.Contains(logBuf.String(), "ChangeSet{Pulls:0") {
+ break
+ }
+ time.Sleep(200 * time.Millisecond)
+ }
+ c.Check(logBuf.String(), check.Not(check.Matches), `(?ms).*0 replicas (0 blocks, 0 bytes) underreplicated.*`)
+}
diff --git a/services/keep-balance/keep_service.go b/services/keep-balance/keep_service.go
new file mode 100644
index 0000000..f65355d
--- /dev/null
+++ b/services/keep-balance/keep_service.go
@@ -0,0 +1,76 @@
+package main
+
+import (
+ "encoding/json"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "net/http"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+// KeepService represents a keepstore server that is being rebalanced.
+type KeepService struct {
+ arvados.KeepService
+ *ChangeSet
+}
+
+// String implements fmt.Stringer.
+func (srv *KeepService) String() string {
+ return fmt.Sprintf("%s (%s:%d, %s)", srv.UUID, srv.ServiceHost, srv.ServicePort, srv.ServiceType)
+}
+
+var ksSchemes = map[bool]string{false: "http", true: "https"}
+
+// URLBase returns scheme://host:port for this server.
+func (srv *KeepService) URLBase() string {
+ return fmt.Sprintf("%s://%s:%d", ksSchemes[srv.ServiceSSLFlag], srv.ServiceHost, srv.ServicePort)
+}
+
+// CommitPulls sends the current list of pull requests to the storage
+// server (even if the list is empty).
+func (srv *KeepService) CommitPulls(c *arvados.Client) error {
+ return srv.put(c, "pull", srv.ChangeSet.Pulls)
+}
+
+// CommitTrash sends the current list of trash requests to the storage
+// server (even if the list is empty).
+func (srv *KeepService) CommitTrash(c *arvados.Client) error {
+ return srv.put(c, "trash", srv.ChangeSet.Trashes)
+}
+
+// Perform a PUT request at path, with data (as JSON) in the request
+// body.
+func (srv *KeepService) put(c *arvados.Client, path string, data interface{}) error {
+ // We'll start a goroutine to do the JSON encoding, so we can
+ // stream it to the http client through a Pipe, rather than
+ // keeping the entire encoded version in memory.
+ jsonR, jsonW := io.Pipe()
+
+ // errC communicates any encoding errors back to our main
+ // goroutine.
+ errC := make(chan error, 1)
+
+ go func() {
+ enc := json.NewEncoder(jsonW)
+ errC <- enc.Encode(data)
+ jsonW.Close()
+ }()
+
+ url := srv.URLBase() + "/" + path
+ req, err := http.NewRequest("PUT", url, ioutil.NopCloser(jsonR))
+ if err != nil {
+ return fmt.Errorf("building request for %s: %v", url, err)
+ }
+ err = c.DoAndDecode(nil, req)
+
+ // If there was an error encoding the request body, report
+ // that instead of the response: obviously we won't get a
+ // useful response if our request wasn't properly encoded.
+ if encErr := <-errC; encErr != nil {
+ return fmt.Errorf("encoding data for %s: %v", url, encErr)
+ }
+
+ return err
+}
diff --git a/services/keep-balance/main.go b/services/keep-balance/main.go
new file mode 100644
index 0000000..42a8d63
--- /dev/null
+++ b/services/keep-balance/main.go
@@ -0,0 +1,156 @@
+package main
+
+import (
+ "encoding/json"
+ "flag"
+ "io/ioutil"
+ "log"
+ "os"
+ "os/signal"
+ "syscall"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+// Config specifies site configuration, like API credentials and the
+// choice of which servers are to be balanced.
+//
+// Config is loaded from a JSON config file (see usage()).
+type Config struct {
+ // Arvados API endpoint and credentials.
+ Client arvados.Client
+
+ // List of service types (e.g., "disk") to balance.
+ KeepServiceTypes []string
+
+ KeepServiceList arvados.KeepServiceList
+
+ // How often to check
+ RunPeriod arvados.Duration
+}
+
+// RunOptions controls runtime behavior. The flags/options that belong
+// here are the ones that are useful for interactive use. For example,
+// "CommitTrash" is a runtime option rather than a config item because
+// it invokes a troubleshooting feature rather than expressing how
+// balancing is meant to be done at a given site.
+//
+// RunOptions fields are controlled by command line flags.
+type RunOptions struct {
+ Once bool
+ CommitPulls bool
+ CommitTrash bool
+ Logger *log.Logger
+ Dumper *log.Logger
+}
+
+var debugf = func(string, ...interface{}) {}
+
+func main() {
+ var config Config
+ var runOptions RunOptions
+
+ configPath := flag.String("config", "",
+ "`path` of json configuration file")
+ serviceListPath := flag.String("config.KeepServiceList", "",
+ "`path` of json file with list of keep services to balance, as given by \"arv keep_service list\" "+
+ "(default: config[\"KeepServiceList\"], or if none given, get all available services and filter by config[\"KeepServiceTypes\"])")
+ flag.BoolVar(&runOptions.Once, "once", false,
+ "balance once and then exit")
+ flag.BoolVar(&runOptions.CommitPulls, "commit-pulls", false,
+ "send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)")
+ flag.BoolVar(&runOptions.CommitTrash, "commit-trash", false,
+ "send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)")
+ dumpFlag := flag.Bool("dump", false, "dump details for each block to stdout")
+ debugFlag := flag.Bool("debug", false, "enable debug messages")
+ flag.Usage = usage
+ flag.Parse()
+
+ if *configPath == "" {
+ log.Fatal("You must specify a config file (see `keep-balance -help`)")
+ }
+ mustReadJSON(&config, *configPath)
+ if *serviceListPath != "" {
+ mustReadJSON(&config.KeepServiceList, *serviceListPath)
+ }
+
+ if *debugFlag {
+ debugf = log.Printf
+ if j, err := json.Marshal(config); err != nil {
+ log.Fatal(err)
+ } else {
+ log.Printf("config is %s", j)
+ }
+ }
+ if *dumpFlag {
+ runOptions.Dumper = log.New(os.Stdout, "", log.LstdFlags)
+ }
+ err := CheckConfig(config, runOptions)
+ if err != nil {
+ // (don't run)
+ } else if runOptions.Once {
+ err = (&Balancer{}).Run(config, runOptions)
+ } else {
+ err = RunForever(config, runOptions, nil)
+ }
+ if err != nil {
+ log.Fatal(err)
+ }
+}
+
+func mustReadJSON(dst interface{}, path string) {
+ if buf, err := ioutil.ReadFile(path); err != nil {
+ log.Fatalf("Reading %q: %v", path, err)
+ } else if err = json.Unmarshal(buf, dst); err != nil {
+ log.Fatalf("Decoding %q: %v", path, err)
+ }
+}
+
+// RunForever runs forever, or (for testing purposes) until the given
+// stop channel is ready to receive.
+func RunForever(config Config, runOptions RunOptions, stop <-chan interface{}) error {
+ if runOptions.Logger == nil {
+ runOptions.Logger = log.New(os.Stderr, "", log.LstdFlags)
+ }
+ logger := runOptions.Logger
+
+ ticker := time.NewTicker(time.Duration(config.RunPeriod))
+
+ // The unbuffered channel here means we only hear SIGUSR1 if
+ // it arrives while we're waiting in select{}.
+ sigUSR1 := make(chan os.Signal)
+ signal.Notify(sigUSR1, syscall.SIGUSR1)
+
+ logger.Printf("starting up: will scan every %v and on SIGUSR1", config.RunPeriod)
+
+ for {
+ if !runOptions.CommitPulls && !runOptions.CommitTrash {
+ logger.Print("WARNING: Will scan periodically, but no changes will be committed.")
+ logger.Print("======= Consider using -commit-pulls and -commit-trash flags.")
+ }
+
+ err := (&Balancer{}).Run(config, runOptions)
+ if err != nil {
+ logger.Print("run failed: ", err)
+ } else {
+ logger.Print("run succeeded")
+ }
+
+ select {
+ case <-stop:
+ signal.Stop(sigUSR1)
+ return nil
+ case <-ticker.C:
+ logger.Print("timer went off")
+ case <-sigUSR1:
+ logger.Print("received SIGUSR1, resetting timer")
+ // Reset the timer so we don't start the N+1st
+ // run too soon after the Nth run is triggered
+ // by SIGUSR1.
+ ticker.Stop()
+ ticker = time.NewTicker(time.Duration(config.RunPeriod))
+ }
+ logger.Print("starting next run")
+ }
+}
diff --git a/services/keep-balance/main_test.go b/services/keep-balance/main_test.go
new file mode 100644
index 0000000..4a56098
--- /dev/null
+++ b/services/keep-balance/main_test.go
@@ -0,0 +1,43 @@
+package main
+
+import (
+ "encoding/json"
+ "time"
+
+ check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&mainSuite{})
+
+type mainSuite struct{}
+
+func (s *mainSuite) TestExampleJSON(c *check.C) {
+ var config Config
+ c.Check(json.Unmarshal(exampleConfigFile, &config), check.IsNil)
+ c.Check(config.KeepServiceTypes, check.DeepEquals, []string{"disk"})
+ c.Check(config.Client.AuthToken, check.Equals, "xyzzy")
+ c.Check(time.Duration(config.RunPeriod), check.Equals, 600*time.Second)
+}
+
+func (s *mainSuite) TestConfigJSONWithKeepServiceList(c *check.C) {
+ var config Config
+ c.Check(json.Unmarshal([]byte(`
+ {
+ "Client": {
+ "APIHost": "zzzzz.arvadosapi.com:443",
+ "AuthToken": "xyzzy",
+ "Insecure": false
+ },
+ "KeepServiceList": {
+ "items": [
+ {"uuid":"zzzzz-bi64l-abcdefghijklmno", "service_type":"disk", "service_host":"a.zzzzz.arvadosapi.com", "service_port":12345},
+ {"uuid":"zzzzz-bi64l-bcdefghijklmnop", "service_type":"blob", "service_host":"b.zzzzz.arvadosapi.com", "service_port":12345}
+ ]
+ },
+ "RunPeriod": "600s"
+ }`), &config), check.IsNil)
+ c.Assert(len(config.KeepServiceList.Items), check.Equals, 2)
+ c.Check(config.KeepServiceList.Items[0].UUID, check.Equals, "zzzzz-bi64l-abcdefghijklmno")
+ c.Check(config.KeepServiceList.Items[0].ServicePort, check.Equals, 12345)
+ c.Check(config.Client.AuthToken, check.Equals, "xyzzy")
+}
diff --git a/services/keep-balance/time_me.go b/services/keep-balance/time_me.go
new file mode 100644
index 0000000..e5f16b7
--- /dev/null
+++ b/services/keep-balance/time_me.go
@@ -0,0 +1,14 @@
+package main
+
+import (
+ "log"
+ "time"
+)
+
+func timeMe(logger *log.Logger, label string) func() {
+ t0 := time.Now()
+ logger.Printf("%s: start", label)
+ return func() {
+ logger.Printf("%s: took %v", label, time.Since(t0))
+ }
+}
diff --git a/services/keep-balance/usage.go b/services/keep-balance/usage.go
new file mode 100644
index 0000000..eb9990c
--- /dev/null
+++ b/services/keep-balance/usage.go
@@ -0,0 +1,83 @@
+package main
+
+import (
+ "flag"
+ "fmt"
+ "os"
+)
+
+var exampleConfigFile = []byte(`
+ {
+ "Client": {
+ "APIHost": "zzzzz.arvadosapi.com:443",
+ "AuthToken": "xyzzy",
+ "Insecure": false
+ },
+ "KeepServiceTypes": [
+ "disk"
+ ],
+ "RunPeriod": "600s"
+ }`)
+
+func usage() {
+ fmt.Fprintf(os.Stderr, `
+
+keep-balance rebalances a set of keepstore servers. It creates new
+copies of underreplicated blocks, deletes excess copies of
+overreplicated and unreferenced blocks, and moves blocks to better
+positions (according to the rendezvous hash algorithm) so clients find
+them faster.
+
+Usage: keep-balance -config path/to/config.json [options]
+
+Options:
+`)
+ flag.PrintDefaults()
+ fmt.Fprintf(os.Stderr, `
+Example config file:
+%s
+
+ Client.AuthToken must be recognized by Arvados as an admin token,
+ and must be recognized by all Keep services as a "data manager
+ key".
+
+ Client.Insecure should be true if your Arvados API endpoint uses
+ an unverifiable SSL/TLS certificate.
+
+Periodic scanning:
+
+ By default, keep-balance operates periodically, i.e.: do a
+ scan/balance operation, sleep, repeat.
+
+ RunPeriod determines the interval between start times of
+ successive scan/balance operations. If a scan/balance operation
+ takes longer than RunPeriod, the next one will follow it
+ immediately.
+
+ If SIGUSR1 is received during an idle period between operations,
+ the next operation will start immediately.
+
+One-time scanning:
+
+ Use the -once flag to do a single operation and then exit. The
+ exit code will be zero if the operation was successful.
+
+Committing:
+
+ By default, keep-service computes and reports changes but does not
+ implement them by sending pull and trash lists to the Keep
+ services.
+
+ Use the -commit-pull and -commit-trash flags to implement the
+ computed changes.
+
+Limitations:
+
+ keep-balance does not attempt to discover whether committed pull
+ and trash requests ever get carried out -- only that they are
+ accepted by the Keep services. If some services are full, new
+ copies of underreplicated blocks might never get made, only
+ repeatedly requested.
+
+`, exampleConfigFile)
+}
diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index 80d8670..d7da67c 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -4,6 +4,7 @@ import (
"bytes"
"flag"
"fmt"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/httpserver"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
"io/ioutil"
@@ -331,8 +332,12 @@ func main() {
}
// Initialize Pull queue and worker
+ arv, err := arvadosclient.MakeArvadosClient()
+ if err != nil {
+ log.Fatalf("MakeArvadosClient: %s", err)
+ }
keepClient := &keepclient.KeepClient{
- Arvados: nil,
+ Arvados: &arv,
Want_replicas: 1,
Client: &http.Client{},
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list