[ARVADOS] updated: c900f416c36cd74675c5bf4c33ad1dbe5d1e78fa

Git user git at public.curoverse.com
Tue May 31 16:24:13 EDT 2016

Summary of changes:
 build/package-build-dockerfiles/Makefile           |   8 +-
 build/package-build-dockerfiles/centos6/Dockerfile |   2 +-
 build/package-build-dockerfiles/debian7/Dockerfile |   2 +-
 build/package-build-dockerfiles/debian8/Dockerfile |   2 +-
 .../ubuntu1204/Dockerfile                          |   2 +-
 .../ubuntu1404/Dockerfile                          |   2 +-
 build/run-build-packages-one-target.sh             |   3 +-
 build/run-build-packages.sh                        |   2 +
 build/run-tests.sh                                 |   8 +-
 sdk/go/arvados/client.go                           | 169 ++++++
 sdk/go/arvados/client_test.go                      |  83 +++
 sdk/go/arvados/collection.go                       |  62 ++
 sdk/go/arvados/doc.go                              |  12 +
 sdk/go/arvados/duration.go                         |  31 +
 sdk/go/arvados/keep_block.go                       |  15 +
 sdk/go/arvados/keep_service.go                     | 123 ++++
 sdk/go/arvados/resource_list.go                    |  25 +
 sdk/go/arvados/resource_list_test.go               |  21 +
 sdk/go/arvados/user.go                             |  17 +
 services/keep-balance/balance.go                   | 638 +++++++++++++++++++++
 services/keep-balance/balance_run_test.go          | 374 ++++++++++++
 services/keep-balance/balance_test.go              | 255 ++++++++
 services/keep-balance/block_state.go               |  95 +++
 services/keep-balance/change_set.go                |  75 +++
 services/keep-balance/change_set_test.go           |  35 ++
 services/keep-balance/collection.go                |  95 +++
 services/keep-balance/integration_test.go          |  92 +++
 services/keep-balance/keep_service.go              |  76 +++
 services/keep-balance/main.go                      | 156 +++++
 services/keep-balance/main_test.go                 |  43 ++
 services/keep-balance/time_me.go                   |  14 +
 services/keep-balance/usage.go                     |  83 +++
 services/keepstore/keepstore.go                    |   7 +-
 33 files changed, 2616 insertions(+), 11 deletions(-)
 create mode 100644 sdk/go/arvados/client.go
 create mode 100644 sdk/go/arvados/client_test.go
 create mode 100644 sdk/go/arvados/collection.go
 create mode 100644 sdk/go/arvados/doc.go
 create mode 100644 sdk/go/arvados/duration.go
 create mode 100644 sdk/go/arvados/keep_block.go
 create mode 100644 sdk/go/arvados/keep_service.go
 create mode 100644 sdk/go/arvados/resource_list.go
 create mode 100644 sdk/go/arvados/resource_list_test.go
 create mode 100644 sdk/go/arvados/user.go
 create mode 100644 services/keep-balance/balance.go
 create mode 100644 services/keep-balance/balance_run_test.go
 create mode 100644 services/keep-balance/balance_test.go
 create mode 100644 services/keep-balance/block_state.go
 create mode 100644 services/keep-balance/change_set.go
 create mode 100644 services/keep-balance/change_set_test.go
 create mode 100644 services/keep-balance/collection.go
 create mode 100644 services/keep-balance/integration_test.go
 create mode 100644 services/keep-balance/keep_service.go
 create mode 100644 services/keep-balance/main.go
 create mode 100644 services/keep-balance/main_test.go
 create mode 100644 services/keep-balance/time_me.go
 create mode 100644 services/keep-balance/usage.go

       via  c900f416c36cd74675c5bf4c33ad1dbe5d1e78fa (commit)
       via  27f948d88bbdd5be848011871d2b592d7057ece1 (commit)
       via  b002129afda08bbb4fdbed6e629858a5c298c068 (commit)
      from  ae72b172c8bf8a52358a89f8a8d744ec5bf2d993 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.

commit c900f416c36cd74675c5bf4c33ad1dbe5d1e78fa
Merge: ae72b17 27f948d
Author: Tom Clegg <tom at curoverse.com>
Date:   Tue May 31 16:23:30 2016 -0400

    Merge branch '9162-keep-balance'
    closes #9162

commit 27f948d88bbdd5be848011871d2b592d7057ece1
Author: Tom Clegg <tom at curoverse.com>
Date:   Tue May 24 10:02:39 2016 -0400

    9162: Add replication level histogram
    Ported from 00a8ece1580a894dbbf9f756685eefc134e4d0d6 by jrandall

diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go
index 7471aa6..2a2480c 100644
--- a/services/keep-balance/balance.go
+++ b/services/keep-balance/balance.go
@@ -3,6 +3,7 @@ package main
 import (
+	"math"
@@ -447,9 +448,11 @@ type balancerStats struct {
 	lost, overrep, unref, garbage, underrep, justright blocksNBytes
 	desired, current                                   blocksNBytes
 	pulls, trashes                                     int
+	replHistogram                                      []int
 func (bal *Balancer) getStatistics() (s balancerStats) {
+	s.replHistogram = make([]int, 2)
 	bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
 		surplus := len(blk.Replicas) - blk.Desired
 		bytes := blkid.Size()
@@ -493,6 +496,11 @@ func (bal *Balancer) getStatistics() (s balancerStats) {
 			s.current.bytes += bytes * int64(len(blk.Replicas))
+		for len(s.replHistogram) <= len(blk.Replicas) {
+			s.replHistogram = append(s.replHistogram, 0)
+		}
+		s.replHistogram[len(blk.Replicas)]++
 	for _, srv := range bal.KeepServices {
 		s.pulls += len(srv.ChangeSet.Pulls)
@@ -521,6 +529,25 @@ func (bal *Balancer) PrintStatistics() {
 		bal.logf("%s: %v\n", srv, srv.ChangeSet)
+	bal.printHistogram(s, 60)
+	bal.logf("===")
+func (bal *Balancer) printHistogram(s balancerStats, hashColumns int) {
+	bal.logf("Replication level distribution (counting N replicas on a single server as N):")
+	maxCount := 0
+	for _, count := range s.replHistogram {
+		if maxCount < count {
+			maxCount = count
+		}
+	}
+	hashes := strings.Repeat("#", hashColumns)
+	countWidth := 1 + int(math.Log10(float64(maxCount+1)))
+	scaleCount := 10 * float64(hashColumns) / math.Floor(1+10*math.Log10(float64(maxCount+1)))
+	for repl, count := range s.replHistogram {
+		nHashes := int(scaleCount * math.Log10(float64(count+1)))
+		bal.logf("%2d: %*d %s", repl, countWidth, count, hashes[:nHashes])
+	}
 // CheckSanityLate checks for configuration and runtime errors after

commit b002129afda08bbb4fdbed6e629858a5c298c068
Author: Tom Clegg <tom at curoverse.com>
Date:   Mon May 16 17:09:21 2016 -0400

    9162: Add keep-balance

diff --git a/build/package-build-dockerfiles/Makefile b/build/package-build-dockerfiles/Makefile
index 9216f82..3482886 100644
--- a/build/package-build-dockerfiles/Makefile
+++ b/build/package-build-dockerfiles/Makefile
@@ -20,10 +20,12 @@ ubuntu1404/generated: common-generated-all
 	test -d ubuntu1404/generated || mkdir ubuntu1404/generated
 	cp -rlt ubuntu1404/generated common-generated/*
-common-generated-all: common-generated/golang-amd64.tar.gz
-common-generated/golang-amd64.tar.gz: common-generated
-	wget -cqO common-generated/golang-amd64.tar.gz http://storage.googleapis.com/golang/go1.4.2.linux-amd64.tar.gz
+common-generated-all: common-generated/$(GOTARBALL)
+common-generated/$(GOTARBALL): common-generated
+	wget -cqO common-generated/$(GOTARBALL) http://storage.googleapis.com/golang/$(GOTARBALL)
 	mkdir common-generated
diff --git a/build/package-build-dockerfiles/centos6/Dockerfile b/build/package-build-dockerfiles/centos6/Dockerfile
index c21c091..570dde1 100644
--- a/build/package-build-dockerfiles/centos6/Dockerfile
+++ b/build/package-build-dockerfiles/centos6/Dockerfile
@@ -5,7 +5,7 @@ MAINTAINER Brett Smith <brett at curoverse.com>
 RUN yum -q -y install make automake gcc gcc-c++ libyaml-devel patch readline-devel zlib-devel libffi-devel openssl-devel bzip2 libtool bison sqlite-devel rpm-build git perl-ExtUtils-MakeMaker libattr-devel nss-devel libcurl-devel which tar unzip scl-utils centos-release-scl postgresql-devel
 # Install golang binary
-ADD generated/golang-amd64.tar.gz /usr/local/
+ADD generated/go1.6.2.linux-amd64.tar.gz /usr/local/
 RUN ln -s /usr/local/go/bin/go /usr/local/bin/
 # Install RVM
diff --git a/build/package-build-dockerfiles/debian7/Dockerfile b/build/package-build-dockerfiles/debian7/Dockerfile
index ccc444c..ddad542 100644
--- a/build/package-build-dockerfiles/debian7/Dockerfile
+++ b/build/package-build-dockerfiles/debian7/Dockerfile
@@ -13,7 +13,7 @@ RUN gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3 && \
     /usr/local/rvm/bin/rvm-exec default gem install cure-fpm --version 1.6.0b
 # Install golang binary
-ADD generated/golang-amd64.tar.gz /usr/local/
+ADD generated/go1.6.2.linux-amd64.tar.gz /usr/local/
 RUN ln -s /usr/local/go/bin/go /usr/local/bin/
diff --git a/build/package-build-dockerfiles/debian8/Dockerfile b/build/package-build-dockerfiles/debian8/Dockerfile
index e32cfb4..80f06a2 100644
--- a/build/package-build-dockerfiles/debian8/Dockerfile
+++ b/build/package-build-dockerfiles/debian8/Dockerfile
@@ -13,7 +13,7 @@ RUN gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3 && \
     /usr/local/rvm/bin/rvm-exec default gem install cure-fpm --version 1.6.0b
 # Install golang binary
-ADD generated/golang-amd64.tar.gz /usr/local/
+ADD generated/go1.6.2.linux-amd64.tar.gz /usr/local/
 RUN ln -s /usr/local/go/bin/go /usr/local/bin/
diff --git a/build/package-build-dockerfiles/ubuntu1204/Dockerfile b/build/package-build-dockerfiles/ubuntu1204/Dockerfile
index 34bf698..2f628b0 100644
--- a/build/package-build-dockerfiles/ubuntu1204/Dockerfile
+++ b/build/package-build-dockerfiles/ubuntu1204/Dockerfile
@@ -13,7 +13,7 @@ RUN gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3 && \
     /usr/local/rvm/bin/rvm-exec default gem install cure-fpm --version 1.6.0b
 # Install golang binary
-ADD generated/golang-amd64.tar.gz /usr/local/
+ADD generated/go1.6.2.linux-amd64.tar.gz /usr/local/
 RUN ln -s /usr/local/go/bin/go /usr/local/bin/
diff --git a/build/package-build-dockerfiles/ubuntu1404/Dockerfile b/build/package-build-dockerfiles/ubuntu1404/Dockerfile
index 5377164..b9c003a 100644
--- a/build/package-build-dockerfiles/ubuntu1404/Dockerfile
+++ b/build/package-build-dockerfiles/ubuntu1404/Dockerfile
@@ -13,7 +13,7 @@ RUN gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3 && \
     /usr/local/rvm/bin/rvm-exec default gem install cure-fpm --version 1.6.0b
 # Install golang binary
-ADD generated/golang-amd64.tar.gz /usr/local/
+ADD generated/go1.6.2.linux-amd64.tar.gz /usr/local/
 RUN ln -s /usr/local/go/bin/go /usr/local/bin/
diff --git a/build/run-build-packages-one-target.sh b/build/run-build-packages-one-target.sh
index 322129e..6fdffd0 100755
--- a/build/run-build-packages-one-target.sh
+++ b/build/run-build-packages-one-target.sh
@@ -128,9 +128,10 @@ if test -z "$packages" ; then
+        keep-balance
+        keep-block-check
-        keep-block-check
diff --git a/build/run-build-packages.sh b/build/run-build-packages.sh
index a51198f..7631718 100755
--- a/build/run-build-packages.sh
+++ b/build/run-build-packages.sh
@@ -384,6 +384,8 @@ package_go_binary services/keepstore keepstore \
     "Keep storage daemon, accessible to clients on the LAN"
 package_go_binary services/keepproxy keepproxy \
     "Make a Keep cluster accessible to clients that are not on the LAN"
+package_go_binary services/keep-balance keep-balance \
+    "Rebalance and garbage-collect data blocks stored in Arvados Keep"
 package_go_binary services/keep-web keep-web \
     "Static web hosting service for user data stored in Arvados Keep"
 package_go_binary services/datamanager arvados-data-manager \
diff --git a/build/run-tests.sh b/build/run-tests.sh
index d229341..30a80f5 100755
--- a/build/run-tests.sh
+++ b/build/run-tests.sh
@@ -70,6 +70,7 @@ services/fuse
@@ -79,6 +80,7 @@ sdk/cli
@@ -150,6 +152,8 @@ sanity_checks() {
     echo -n 'go: '
     go version \
         || fatal "No go binary. See http://golang.org/doc/install"
+    [[ $(go version) =~ go1.([0-9]+) ]] && [[ ${BASH_REMATCH[1]} -ge 6 ]] \
+        || fatal "Go >= 1.6 required. See http://golang.org/doc/install"
     echo -n 'gcc: '
     gcc --version | egrep ^gcc \
         || fatal "No gcc. Try: apt-get install build-essential"
@@ -499,7 +503,7 @@ do_test_once() {
                 # "go test -check.vv giturl" doesn't work, but this
                 # does:
-                cd "$WORKSPACE/$1" && go test ${short:+-short} ${coverflags[@]} ${testargs[$1]}
+                cd "$WORKSPACE/$1" && go test ${short:+-short} ${testargs[$1]}
                 # The above form gets verbose even when testargs is
                 # empty, so use this form in such cases:
@@ -703,6 +707,7 @@ do_install services/api apiserver
 declare -a gostuff
+    sdk/go/arvados
@@ -714,6 +719,7 @@ gostuff=(
+    services/keep-balance
diff --git a/sdk/go/arvados/client.go b/sdk/go/arvados/client.go
new file mode 100644
index 0000000..ee830c8
--- /dev/null
+++ b/sdk/go/arvados/client.go
@@ -0,0 +1,169 @@
+package arvados
+import (
+	"crypto/tls"
+	"encoding/json"
+	"fmt"
+	"io"
+	"io/ioutil"
+	"net/http"
+	"net/url"
+	"os"
+// A Client is an HTTP client with an API endpoint and a set of
+// Arvados credentials.
+// It offers methods for accessing individual Arvados APIs, and
+// methods that implement common patterns like fetching multiple pages
+// of results using List APIs.
+type Client struct {
+	// HTTP client used to make requests. If nil,
+	// http.DefaultClient or InsecureHTTPClient will be used.
+	Client *http.Client
+	// Hostname (or host:port) of Arvados API server.
+	APIHost string
+	// User authentication token.
+	AuthToken string
+	// Accept unverified certificates. This works only if the
+	// Client field is nil: otherwise, it has no effect.
+	Insecure bool
+// The default http.Client used by a Client with Insecure==true and
+// Client==nil.
+var InsecureHTTPClient = &http.Client{
+	Transport: &http.Transport{
+		TLSClientConfig: &tls.Config{
+			InsecureSkipVerify: true}}}
+// NewClientFromEnv creates a new Client that uses the default HTTP
+// client with the API endpoint and credentials given by the
+// ARVADOS_API_* environment variables.
+func NewClientFromEnv() *Client {
+	return &Client{
+		APIHost:   os.Getenv("ARVADOS_API_HOST"),
+		AuthToken: os.Getenv("ARVADOS_API_TOKEN"),
+		Insecure:  os.Getenv("ARVADOS_API_HOST_INSECURE") != "",
+	}
+// Do adds authentication headers and then calls (*http.Client)Do().
+func (c *Client) Do(req *http.Request) (*http.Response, error) {
+	if c.AuthToken != "" {
+		req.Header.Add("Authorization", "OAuth2 "+c.AuthToken)
+	}
+	return c.httpClient().Do(req)
+// DoAndDecode performs req and unmarshals the response (which must be
+// JSON) into dst. Use this instead of RequestAndDecode if you need
+// more control of the http.Request object.
+func (c *Client) DoAndDecode(dst interface{}, req *http.Request) error {
+	resp, err := c.Do(req)
+	if err != nil {
+		return err
+	}
+	defer resp.Body.Close()
+	buf, err := ioutil.ReadAll(resp.Body)
+	if err != nil {
+		return err
+	}
+	if resp.StatusCode != 200 {
+		return fmt.Errorf("request failed (%s): %s", req.URL, resp.Status)
+	}
+	if dst == nil {
+		return nil
+	}
+	return json.Unmarshal(buf, dst)
+// RequestAndDecode performs an API request and unmarshals the
+// response (which must be JSON) into dst. Method and body arguments
+// are the same as for http.NewRequest(). The given path is added to
+// the server's scheme/host/port to form the request URL. The given
+// params are passed via POST form or query string.
+// path must not contain a query string.
+func (c *Client) RequestAndDecode(dst interface{}, method, path string, body io.Reader, params interface{}) error {
+	urlString := c.apiURL(path)
+	var urlValues url.Values
+	if v, ok := params.(url.Values); ok {
+		urlValues = v
+	} else if params != nil {
+		// Convert an arbitrary struct to url.Values. For
+		// example, Foo{Bar: []int{1,2,3}, Baz: "waz"} becomes
+		// url.Values{`bar`:`{"a":[1,2,3]}`,`Baz`:`waz`}
+		//
+		// TODO: Do this more efficiently, possibly using
+		// json.Decode/Encode, so the whole thing doesn't have
+		// to get encoded, decoded, and re-encoded.
+		j, err := json.Marshal(params)
+		if err != nil {
+			return err
+		}
+		var generic map[string]interface{}
+		err = json.Unmarshal(j, &generic)
+		if err != nil {
+			return err
+		}
+		urlValues = url.Values{}
+		for k, v := range generic {
+			if v, ok := v.(string); ok {
+				urlValues.Set(k, v)
+				continue
+			}
+			j, err := json.Marshal(v)
+			if err != nil {
+				return err
+			}
+			urlValues.Set(k, string(j))
+		}
+	}
+	if (method == "GET" || body != nil) && urlValues != nil {
+		// FIXME: what if params don't fit in URL
+		u, err := url.Parse(urlString)
+		if err != nil {
+			return err
+		}
+		u.RawQuery = urlValues.Encode()
+		urlString = u.String()
+	}
+	req, err := http.NewRequest(method, urlString, body)
+	if err != nil {
+		return err
+	}
+	return c.DoAndDecode(dst, req)
+func (c *Client) httpClient() *http.Client {
+	switch {
+	case c.Client != nil:
+		return c.Client
+	case c.Insecure:
+		return InsecureHTTPClient
+	default:
+		return http.DefaultClient
+	}
+func (c *Client) apiURL(path string) string {
+	return "https://" + c.APIHost + "/" + path
+// DiscoveryDocument is the Arvados server's description of itself.
+type DiscoveryDocument struct {
+	DefaultCollectionReplication int   `json:"defaultCollectionReplication"`
+	BlobSignatureTTL             int64 `json:"blobSignatureTtl"`
+// DiscoveryDocument returns a *DiscoveryDocument. The returned object
+// should not be modified: the same object may be returned by
+// subsequent calls.
+func (c *Client) DiscoveryDocument() (*DiscoveryDocument, error) {
+	var dd DiscoveryDocument
+	return &dd, c.RequestAndDecode(&dd, "GET", "discovery/v1/apis/arvados/v1/rest", nil, nil)
diff --git a/sdk/go/arvados/client_test.go b/sdk/go/arvados/client_test.go
new file mode 100644
index 0000000..2db50bf
--- /dev/null
+++ b/sdk/go/arvados/client_test.go
@@ -0,0 +1,83 @@
+package arvados
+import (
+	"bytes"
+	"fmt"
+	"io/ioutil"
+	"net/http"
+	"sync"
+	"testing"
+type stubTransport struct {
+	Responses map[string]string
+	Requests  []http.Request
+	sync.Mutex
+func (stub *stubTransport) RoundTrip(req *http.Request) (*http.Response, error) {
+	stub.Lock()
+	stub.Requests = append(stub.Requests, *req)
+	stub.Unlock()
+	resp := &http.Response{
+		Status:     "200 OK",
+		StatusCode: 200,
+		Proto:      "HTTP/1.1",
+		ProtoMajor: 1,
+		ProtoMinor: 1,
+		Request:    req,
+	}
+	str := stub.Responses[req.URL.Path]
+	if str == "" {
+		resp.Status = "404 Not Found"
+		resp.StatusCode = 404
+		str = "{}"
+	}
+	buf := bytes.NewBufferString(str)
+	resp.Body = ioutil.NopCloser(buf)
+	resp.ContentLength = int64(buf.Len())
+	return resp, nil
+type errorTransport struct{}
+func (stub *errorTransport) RoundTrip(req *http.Request) (*http.Response, error) {
+	return nil, fmt.Errorf("something awful happened")
+func TestCurrentUser(t *testing.T) {
+	t.Parallel()
+	stub := &stubTransport{
+		Responses: map[string]string{
+			"/arvados/v1/users/current": `{"uuid":"zzzzz-abcde-012340123401234"}`,
+		},
+	}
+	c := &Client{
+		Client: &http.Client{
+			Transport: stub,
+		},
+		APIHost:   "zzzzz.arvadosapi.com",
+		AuthToken: "xyzzy",
+	}
+	u, err := c.CurrentUser()
+	if err != nil {
+		t.Fatal(err)
+	}
+	if x := "zzzzz-abcde-012340123401234"; u.UUID != x {
+		t.Errorf("got uuid %q, expected %q", u.UUID, x)
+	}
+	if len(stub.Requests) < 1 {
+		t.Fatal("empty stub.Requests")
+	}
+	hdr := stub.Requests[len(stub.Requests)-1].Header
+	if hdr.Get("Authorization") != "OAuth2 xyzzy" {
+		t.Errorf("got headers %+q, expected Authorization header", hdr)
+	}
+	c.Client.Transport = &errorTransport{}
+	u, err = c.CurrentUser()
+	if err == nil {
+		t.Errorf("got nil error, expected something awful")
+	}
diff --git a/sdk/go/arvados/collection.go b/sdk/go/arvados/collection.go
new file mode 100644
index 0000000..71f5247
--- /dev/null
+++ b/sdk/go/arvados/collection.go
@@ -0,0 +1,62 @@
+package arvados
+import (
+	"bufio"
+	"fmt"
+	"strings"
+	"time"
+	"git.curoverse.com/arvados.git/sdk/go/manifest"
+// Collection is an arvados#collection resource.
+type Collection struct {
+	UUID                   string     `json:"uuid,omitempty"`
+	ExpiresAt              *time.Time `json:"expires_at,omitempty"`
+	ManifestText           string     `json:"manifest_text,omitempty"`
+	CreatedAt              *time.Time `json:"created_at,omitempty"`
+	ModifiedAt             *time.Time `json:"modified_at,omitempty"`
+	PortableDataHash       string     `json:"portable_data_hash,omitempty"`
+	ReplicationConfirmed   *int       `json:"replication_confirmed,omitempty"`
+	ReplicationConfirmedAt *time.Time `json:"replication_confirmed_at,omitempty"`
+	ReplicationDesired     *int       `json:"replication_desired,omitempty"`
+// SizedDigests returns the hash+size part of each data block
+// referenced by the collection.
+func (c *Collection) SizedDigests() ([]SizedDigest, error) {
+	if c.ManifestText == "" && c.PortableDataHash != "d41d8cd98f00b204e9800998ecf8427e+0" {
+		// TODO: Check more subtle forms of corruption, too
+		return nil, fmt.Errorf("manifest is missing")
+	}
+	var sds []SizedDigest
+	scanner := bufio.NewScanner(strings.NewReader(c.ManifestText))
+	scanner.Buffer(make([]byte, 1048576), len(c.ManifestText))
+	for scanner.Scan() {
+		line := scanner.Text()
+		tokens := strings.Split(line, " ")
+		if len(tokens) < 3 {
+			return nil, fmt.Errorf("Invalid stream (<3 tokens): %q", line)
+		}
+		for _, token := range tokens[1:] {
+			if !manifest.LocatorPattern.MatchString(token) {
+				// FIXME: ensure it's a file token
+				break
+			}
+			// FIXME: shouldn't assume 32 char hash
+			if i := strings.IndexRune(token[33:], '+'); i >= 0 {
+				token = token[:33+i]
+			}
+			sds = append(sds, SizedDigest(token))
+		}
+	}
+	return sds, scanner.Err()
+// CollectionList is an arvados#collectionList resource.
+type CollectionList struct {
+	Items          []Collection `json:"items"`
+	ItemsAvailable int          `json:"items_available"`
+	Offset         int          `json:"offset"`
+	Limit          int          `json:"limit"`
diff --git a/sdk/go/arvados/doc.go b/sdk/go/arvados/doc.go
new file mode 100644
index 0000000..1e8141e
--- /dev/null
+++ b/sdk/go/arvados/doc.go
@@ -0,0 +1,12 @@
+// Package arvados is a client library for Arvados.
+// The API is not stable: it should be considered experimental
+// pre-release.
+// The intent is to offer model types and API call functions that can
+// be generated automatically (or at least mostly automatically) from
+// a discovery document. For the time being, there is a manually
+// generated subset of those types and API calls with (approximately)
+// the right signatures, plus client/authentication support and some
+// convenience functions.
+package arvados
diff --git a/sdk/go/arvados/duration.go b/sdk/go/arvados/duration.go
new file mode 100644
index 0000000..1639c58
--- /dev/null
+++ b/sdk/go/arvados/duration.go
@@ -0,0 +1,31 @@
+package arvados
+import (
+	"encoding/json"
+	"fmt"
+	"time"
+// Duration is time.Duration but looks like "12s" in JSON, rather than
+// a number of nanoseconds.
+type Duration time.Duration
+// UnmarshalJSON implements json.Unmarshaler
+func (d *Duration) UnmarshalJSON(data []byte) error {
+	if data[0] == '"' {
+		dur, err := time.ParseDuration(string(data[1 : len(data)-1]))
+		*d = Duration(dur)
+		return err
+	}
+	return fmt.Errorf("duration must be given as a string like \"600s\" or \"1h30m\"")
+// MarshalJSON implements json.Marshaler
+func (d *Duration) MarshalJSON() ([]byte, error) {
+	return json.Marshal(d.String())
+// String implements fmt.Stringer
+func (d Duration) String() string {
+	return time.Duration(d).String()
diff --git a/sdk/go/arvados/keep_block.go b/sdk/go/arvados/keep_block.go
new file mode 100644
index 0000000..c9a7712
--- /dev/null
+++ b/sdk/go/arvados/keep_block.go
@@ -0,0 +1,15 @@
+package arvados
+import (
+	"strconv"
+	"strings"
+// SizedDigest is a minimal Keep block locator: hash+size
+type SizedDigest string
+// Size returns the size of the data block, in bytes.
+func (sd SizedDigest) Size() int64 {
+	n, _ := strconv.ParseInt(strings.Split(string(sd), "+")[1], 10, 64)
+	return n
diff --git a/sdk/go/arvados/keep_service.go b/sdk/go/arvados/keep_service.go
new file mode 100644
index 0000000..4af1b79
--- /dev/null
+++ b/sdk/go/arvados/keep_service.go
@@ -0,0 +1,123 @@
+package arvados
+import (
+	"bufio"
+	"fmt"
+	"net/http"
+	"strconv"
+	"strings"
+// KeepService is an arvados#keepService record
+type KeepService struct {
+	UUID           string `json:"uuid"`
+	ServiceHost    string `json:"service_host"`
+	ServicePort    int    `json:"service_port"`
+	ServiceSSLFlag bool   `json:"service_ssl_flag"`
+	ServiceType    string `json:"service_type"`
+	ReadOnly       bool   `json:"read_only"`
+// KeepServiceList is an arvados#keepServiceList record
+type KeepServiceList struct {
+	Items          []KeepService `json:"items"`
+	ItemsAvailable int           `json:"items_available"`
+	Offset         int           `json:"offset"`
+	Limit          int           `json:"limit"`
+// KeepServiceIndexEntry is what a keep service's index response tells
+// us about a stored block.
+type KeepServiceIndexEntry struct {
+	SizedDigest
+	Mtime int64
+// EachKeepService calls f once for every readable
+// KeepService. EachKeepService stops if it encounters an
+// error, such as f returning a non-nil error.
+func (c *Client) EachKeepService(f func(KeepService) error) error {
+	params := ResourceListParams{}
+	for {
+		var page KeepServiceList
+		err := c.RequestAndDecode(&page, "GET", "arvados/v1/keep_services", nil, params)
+		if err != nil {
+			return err
+		}
+		for _, item := range page.Items {
+			err = f(item)
+			if err != nil {
+				return err
+			}
+		}
+		params.Offset = params.Offset + len(page.Items)
+		if params.Offset >= page.ItemsAvailable {
+			return nil
+		}
+	}
+func (s *KeepService) url(path string) string {
+	var f string
+	if s.ServiceSSLFlag {
+		f = "https://%s:%d/%s"
+	} else {
+		f = "http://%s:%d/%s"
+	}
+	return fmt.Sprintf(f, s.ServiceHost, s.ServicePort, path)
+// String implements fmt.Stringer
+func (s *KeepService) String() string {
+	return s.UUID
+// Index returns an unsorted list of blocks that can be retrieved from
+// this server.
+func (s *KeepService) Index(c *Client, prefix string) ([]KeepServiceIndexEntry, error) {
+	url := s.url("index/" + prefix)
+	req, err := http.NewRequest("GET", url, nil)
+	if err != nil {
+		return nil, fmt.Errorf("NewRequest(%v): %v", url, err)
+	}
+	resp, err := c.Do(req)
+	if err != nil {
+		return nil, fmt.Errorf("Do(%v): %v", url, err)
+	} else if resp.StatusCode != 200 {
+		return nil, fmt.Errorf("%v: %v", url, resp.Status)
+	}
+	defer resp.Body.Close()
+	var entries []KeepServiceIndexEntry
+	scanner := bufio.NewScanner(resp.Body)
+	sawEOF := false
+	for scanner.Scan() {
+		if sawEOF {
+			return nil, fmt.Errorf("Index response contained non-terminal blank line")
+		}
+		line := scanner.Text()
+		if line == "" {
+			sawEOF = true
+			continue
+		}
+		fields := strings.Split(line, " ")
+		if len(fields) != 2 {
+			return nil, fmt.Errorf("Malformed index line %q: %d fields", line, len(fields))
+		}
+		mtime, err := strconv.ParseInt(fields[1], 10, 64)
+		if err != nil {
+			return nil, fmt.Errorf("Malformed index line %q: mtime: %v", line, err)
+		}
+		entries = append(entries, KeepServiceIndexEntry{
+			SizedDigest: SizedDigest(fields[0]),
+			Mtime:       mtime,
+		})
+	}
+	if err := scanner.Err(); err != nil {
+		return nil, fmt.Errorf("Error scanning index response: %v", err)
+	}
+	if !sawEOF {
+		return nil, fmt.Errorf("Index response had no EOF marker")
+	}
+	return entries, nil
diff --git a/sdk/go/arvados/resource_list.go b/sdk/go/arvados/resource_list.go
new file mode 100644
index 0000000..e9ea268
--- /dev/null
+++ b/sdk/go/arvados/resource_list.go
@@ -0,0 +1,25 @@
+package arvados
+import "encoding/json"
+// ResourceListParams expresses which results are requested in a
+// list/index API.
+type ResourceListParams struct {
+	Select  []string `json:"select,omitempty"`
+	Filters []Filter `json:"filters,omitempty"`
+	Limit   *int     `json:"limit,omitempty"`
+	Offset  int      `json:"offset,omitempty"`
+	Order   string   `json:"order,omitempty"`
+// A Filter restricts the set of records returned by a list/index API.
+type Filter struct {
+	Attr     string
+	Operator string
+	Operand  interface{}
+// MarshalJSON encodes a Filter in the form expected by the API.
+func (f *Filter) MarshalJSON() ([]byte, error) {
+	return json.Marshal([]interface{}{f.Attr, f.Operator, f.Operand})
diff --git a/sdk/go/arvados/resource_list_test.go b/sdk/go/arvados/resource_list_test.go
new file mode 100644
index 0000000..b5e6e7d
--- /dev/null
+++ b/sdk/go/arvados/resource_list_test.go
@@ -0,0 +1,21 @@
+package arvados
+import (
+	"bytes"
+	"encoding/json"
+	"testing"
+	"time"
+func TestMarshalFiltersWithNanoseconds(t *testing.T) {
+	t0 := time.Now()
+	t0str := t0.Format(time.RFC3339Nano)
+	buf, err := json.Marshal([]Filter{
+		{Attr: "modified_at", Operator: "=", Operand: t0}})
+	if err != nil {
+		t.Fatal(err)
+	}
+	if expect := []byte(`[["modified_at","=","` + t0str + `"]]`); 0 != bytes.Compare(buf, expect) {
+		t.Errorf("Encoded as %q, expected %q", buf, expect)
+	}
diff --git a/sdk/go/arvados/user.go b/sdk/go/arvados/user.go
new file mode 100644
index 0000000..684a3af
--- /dev/null
+++ b/sdk/go/arvados/user.go
@@ -0,0 +1,17 @@
+package arvados
+// User is an arvados#user record
+type User struct {
+	UUID     string `json:"uuid,omitempty"`
+	IsActive bool   `json:"is_active"`
+	IsAdmin  bool   `json:"is_admin"`
+	Username string `json:"username,omitempty"`
+// CurrentUser calls arvados.v1.users.current, and returns the User
+// record corresponding to this client's credentials.
+func (c *Client) CurrentUser() (User, error) {
+	var u User
+	err := c.RequestAndDecode(&u, "GET", "arvados/v1/users/current", nil, nil)
+	return u, err
diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go
new file mode 100644
index 0000000..7471aa6
--- /dev/null
+++ b/services/keep-balance/balance.go
@@ -0,0 +1,611 @@
+package main
+import (
+	"fmt"
+	"log"
+	"os"
+	"runtime"
+	"strings"
+	"sync"
+	"time"
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+	"git.curoverse.com/arvados.git/sdk/go/keepclient"
+// CheckConfig returns an error if anything is wrong with the given
+// config and runOptions.
+func CheckConfig(config Config, runOptions RunOptions) error {
+	if len(config.KeepServiceList.Items) > 0 && config.KeepServiceTypes != nil {
+		return fmt.Errorf("cannot specify both KeepServiceList and KeepServiceTypes in config")
+	}
+	if !runOptions.Once && config.RunPeriod == arvados.Duration(0) {
+		return fmt.Errorf("you must either use the -once flag, or specify RunPeriod in config")
+	}
+	return nil
+// Balancer compares the contents of keepstore servers with the
+// collections stored in Arvados, and issues pull/trash requests
+// needed to get (closer to) the optimal data layout.
+// In the optimal data layout: every data block referenced by a
+// collection is replicated at least as many times as desired by the
+// collection; there are no unreferenced data blocks older than
+// BlobSignatureTTL; and all N existing replicas of a given data block
+// are in the N best positions in rendezvous probe order.
+type Balancer struct {
+	*BlockStateMap
+	KeepServices       map[string]*KeepService
+	DefaultReplication int
+	Logger             *log.Logger
+	Dumper             *log.Logger
+	MinMtime           int64
+	collScanned  int
+	serviceRoots map[string]string
+	errors       []error
+	mutex        sync.Mutex
+// Run performs a balance operation using the given config and
+// runOptions. It should only be called once on a given Balancer
+// object. Typical usage:
+//   err = (&Balancer{}).Run(config, runOptions)
+func (bal *Balancer) Run(config Config, runOptions RunOptions) (err error) {
+	bal.Dumper = runOptions.Dumper
+	bal.Logger = runOptions.Logger
+	if bal.Logger == nil {
+		bal.Logger = log.New(os.Stderr, "", log.LstdFlags)
+	}
+	defer timeMe(bal.Logger, "Run")()
+	if len(config.KeepServiceList.Items) > 0 {
+		err = bal.SetKeepServices(config.KeepServiceList)
+	} else {
+		err = bal.DiscoverKeepServices(&config.Client, config.KeepServiceTypes)
+	}
+	if err != nil {
+		return
+	}
+	if err = bal.CheckSanityEarly(&config.Client); err != nil {
+		return
+	}
+	if runOptions.CommitTrash {
+		if err = bal.ClearTrashLists(&config.Client); err != nil {
+			return
+		}
+	}
+	if err = bal.GetCurrentState(&config.Client); err != nil {
+		return
+	}
+	bal.ComputeChangeSets()
+	bal.PrintStatistics()
+	if err = bal.CheckSanityLate(); err != nil {
+		return
+	}
+	if runOptions.CommitPulls {
+		err = bal.CommitPulls(&config.Client)
+		if err != nil {
+			// Skip trash if we can't pull. (Too cautious?)
+			return
+		}
+	}
+	if runOptions.CommitTrash {
+		err = bal.CommitTrash(&config.Client)
+	}
+	return
+// SetKeepServices sets the list of KeepServices to operate on.
+func (bal *Balancer) SetKeepServices(srvList arvados.KeepServiceList) error {
+	bal.KeepServices = make(map[string]*KeepService)
+	for _, srv := range srvList.Items {
+		bal.KeepServices[srv.UUID] = &KeepService{
+			KeepService: srv,
+			ChangeSet:   &ChangeSet{},
+		}
+	}
+	return nil
+// DiscoverKeepServices sets the list of KeepServices by calling the
+// API to get a list of all services, and selecting the ones whose
+// ServiceType is in okTypes.
+func (bal *Balancer) DiscoverKeepServices(c *arvados.Client, okTypes []string) error {
+	bal.KeepServices = make(map[string]*KeepService)
+	ok := make(map[string]bool)
+	for _, t := range okTypes {
+		ok[t] = true
+	}
+	return c.EachKeepService(func(srv arvados.KeepService) error {
+		if ok[srv.ServiceType] {
+			bal.KeepServices[srv.UUID] = &KeepService{
+				KeepService: srv,
+				ChangeSet:   &ChangeSet{},
+			}
+		} else {
+			bal.logf("skipping %v with service type %q", srv.UUID, srv.ServiceType)
+		}
+		return nil
+	})
+// CheckSanityEarly checks for configuration and runtime errors that
+// can be detected before GetCurrentState() and ComputeChangeSets()
+// are called.
+// If it returns an error, it is pointless to run GetCurrentState or
+// ComputeChangeSets: after doing so, the statistics would be
+// meaningless and it would be dangerous to run any Commit methods.
+func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
+	u, err := c.CurrentUser()
+	if err != nil {
+		return fmt.Errorf("CurrentUser(): %v", err)
+	}
+	if !u.IsActive || !u.IsAdmin {
+		return fmt.Errorf("current user (%s) is not an active admin user", u.UUID)
+	}
+	for _, srv := range bal.KeepServices {
+		if srv.ServiceType == "proxy" {
+			return fmt.Errorf("config error: %s: proxy servers cannot be balanced", srv)
+		}
+	}
+	return nil
+// ClearTrashLists sends an empty trash list to each keep
+// service. Calling this before GetCurrentState avoids races.
+// When a block appears in an index, we assume that replica will still
+// exist after we delete other replicas on other servers. However,
+// it's possible that a previous rebalancing operation made different
+// decisions (e.g., servers were added/removed, and rendezvous order
+// changed). In this case, the replica might already be on that
+// server's trash list, and it might be deleted before we send a
+// replacement trash list.
+// We avoid this problem if we clear all trash lists before getting
+// indexes. (We also assume there is only one rebalancing process
+// running at a time.)
+func (bal *Balancer) ClearTrashLists(c *arvados.Client) error {
+	for _, srv := range bal.KeepServices {
+		srv.ChangeSet = &ChangeSet{}
+	}
+	return bal.CommitTrash(c)
+// GetCurrentState determines the current replication state, and the
+// desired replication level, for every block that is either
+// retrievable or referenced.
+// It determines the current replication state by reading the block index
+// from every known Keep service.
+// It determines the desired replication level by retrieving all
+// collection manifests in the database (API server).
+// It encodes the resulting information in BlockStateMap.
+func (bal *Balancer) GetCurrentState(c *arvados.Client) error {
+	defer timeMe(bal.Logger, "GetCurrentState")()
+	bal.BlockStateMap = NewBlockStateMap()
+	dd, err := c.DiscoveryDocument()
+	if err != nil {
+		return err
+	}
+	bal.DefaultReplication = dd.DefaultCollectionReplication
+	bal.MinMtime = time.Now().Unix() - dd.BlobSignatureTTL
+	errs := make(chan error, 2+len(bal.KeepServices))
+	wg := sync.WaitGroup{}
+	// Start one goroutine for each KeepService: retrieve the
+	// index, and add the returned blocks to BlockStateMap.
+	for _, srv := range bal.KeepServices {
+		wg.Add(1)
+		go func(srv *KeepService) {
+			defer wg.Done()
+			bal.logf("%s: retrieve index", srv)
+			idx, err := srv.Index(c, "")
+			if err != nil {
+				errs <- fmt.Errorf("%s: %v", srv, err)
+				return
+			}
+			bal.logf("%s: add %d replicas to map", srv, len(idx))
+			bal.BlockStateMap.AddReplicas(srv, idx)
+			bal.logf("%s: done", srv)
+		}(srv)
+	}
+	// collQ buffers incoming collections so we can start fetching
+	// the next page without waiting for the current page to
+	// finish processing. (1000 happens to match the page size
+	// used by (*arvados.Client)EachCollection(), but it's OK if
+	// they don't match.)
+	collQ := make(chan arvados.Collection, 1000)
+	// Start a goroutine to process collections. (We could use a
+	// worker pool here, but even with a single worker we already
+	// process collections much faster than we can retrieve them.)
+	wg.Add(1)
+	go func() {
+		defer wg.Done()
+		for coll := range collQ {
+			err := bal.addCollection(coll)
+			if err != nil {
+				errs <- err
+				for range collQ {
+				}
+				return
+			}
+			bal.collScanned++
+		}
+	}()
+	// Start a goroutine to retrieve all collections from the
+	// Arvados database and send them to collQ for processing.
+	wg.Add(1)
+	go func() {
+		defer wg.Done()
+		err = EachCollection(c,
+			func(coll arvados.Collection) error {
+				collQ <- coll
+				if len(errs) > 0 {
+					// some other GetCurrentState
+					// error happened: no point
+					// getting any more
+					// collections.
+					return fmt.Errorf("")
+				}
+				return nil
+			}, func(done, total int) {
+				bal.logf("collections: %d/%d", done, total)
+			})
+		close(collQ)
+		if err != nil {
+			errs <- err
+		}
+	}()
+	go func() {
+		// Send a nil error when all goroutines finish. If
+		// this is the first error sent to errs, then
+		// everything worked.
+		wg.Wait()
+		errs <- nil
+	}()
+	return <-errs
+func (bal *Balancer) addCollection(coll arvados.Collection) error {
+	blkids, err := coll.SizedDigests()
+	if err != nil {
+		bal.mutex.Lock()
+		bal.errors = append(bal.errors, fmt.Errorf("%v: %v", coll.UUID, err))
+		bal.mutex.Unlock()
+		return nil
+	}
+	repl := bal.DefaultReplication
+	if coll.ReplicationDesired != nil {
+		repl = *coll.ReplicationDesired
+	}
+	debugf("%v: %d block x%d", coll.UUID, len(blkids), repl)
+	bal.BlockStateMap.IncreaseDesired(repl, blkids)
+	return nil
+// ComputeChangeSets compares, for each known block, the current and
+// desired replication states. If it is possible to get closer to the
+// desired state by copying or deleting blocks, it adds those changes
+// to the relevant KeepServices' ChangeSets.
+// It does not actually apply any of the computed changes.
+func (bal *Balancer) ComputeChangeSets() {
+	// This just calls balanceBlock() once for each block, using a
+	// pool of worker goroutines.
+	defer timeMe(bal.Logger, "ComputeChangeSets")()
+	bal.setupServiceRoots()
+	type balanceTask struct {
+		blkid arvados.SizedDigest
+		blk   *BlockState
+	}
+	nWorkers := 1 + runtime.NumCPU()
+	todo := make(chan balanceTask, nWorkers)
+	var wg sync.WaitGroup
+	for i := 0; i < nWorkers; i++ {
+		wg.Add(1)
+		go func() {
+			for work := range todo {
+				bal.balanceBlock(work.blkid, work.blk)
+			}
+			wg.Done()
+		}()
+	}
+	bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
+		todo <- balanceTask{
+			blkid: blkid,
+			blk:   blk,
+		}
+	})
+	close(todo)
+	wg.Wait()
+func (bal *Balancer) setupServiceRoots() {
+	bal.serviceRoots = make(map[string]string)
+	for _, srv := range bal.KeepServices {
+		bal.serviceRoots[srv.UUID] = srv.UUID
+	}
+const (
+	changeStay = iota
+	changePull
+	changeTrash
+	changeNone
+var changeName = map[int]string{
+	changeStay:  "stay",
+	changePull:  "pull",
+	changeTrash: "trash",
+	changeNone:  "none",
+// balanceBlock compares current state to desired state for a single
+// block, and makes the appropriate ChangeSet calls.
+func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) {
+	debugf("balanceBlock: %v %+v", blkid, blk)
+	uuids := keepclient.NewRootSorter(bal.serviceRoots, string(blkid[:32])).GetSortedRoots()
+	hasRepl := make(map[string]Replica, len(bal.serviceRoots))
+	for _, repl := range blk.Replicas {
+		hasRepl[repl.UUID] = repl
+		// TODO: when multiple copies are on one server, use
+		// the oldest one that doesn't have a timestamp
+		// collision with other replicas.
+	}
+	// number of replicas already found in positions better than
+	// the position we're contemplating now.
+	reportedBestRepl := 0
+	// To be safe we assume two replicas with the same Mtime are
+	// in fact the same replica being reported more than
+	// once. len(uniqueBestRepl) is the number of distinct
+	// replicas in the best rendezvous positions we've considered
+	// so far.
+	uniqueBestRepl := make(map[int64]bool, len(bal.serviceRoots))
+	// pulls is the number of Pull changes we have already
+	// requested. (For purposes of deciding whether to Pull to
+	// rendezvous position N, we should assume all pulls we have
+	// requested on rendezvous positions M<N will be successful.)
+	pulls := 0
+	var changes []string
+	for _, uuid := range uuids {
+		change := changeNone
+		srv := bal.KeepServices[uuid]
+		// TODO: request a Touch if Mtime is duplicated.
+		repl, ok := hasRepl[srv.UUID]
+		if ok {
+			// This service has a replica. We should
+			// delete it if [1] we already have enough
+			// distinct replicas in better rendezvous
+			// positions and [2] this replica's Mtime is
+			// distinct from all of the better replicas'
+			// Mtimes.
+			if !srv.ReadOnly &&
+				repl.Mtime < bal.MinMtime &&
+				len(uniqueBestRepl) >= blk.Desired &&
+				!uniqueBestRepl[repl.Mtime] {
+				srv.AddTrash(Trash{
+					SizedDigest: blkid,
+					Mtime:       repl.Mtime,
+				})
+				change = changeTrash
+			} else {
+				change = changeStay
+			}
+			uniqueBestRepl[repl.Mtime] = true
+			reportedBestRepl++
+		} else if pulls+reportedBestRepl < blk.Desired &&
+			len(blk.Replicas) > 0 &&
+			!srv.ReadOnly {
+			// This service doesn't have a replica. We
+			// should pull one to this server if we don't
+			// already have enough (existing+requested)
+			// replicas in better rendezvous positions.
+			srv.AddPull(Pull{
+				SizedDigest: blkid,
+				Source:      blk.Replicas[0].KeepService,
+			})
+			pulls++
+			change = changePull
+		}
+		if bal.Dumper != nil {
+			changes = append(changes, fmt.Sprintf("%s:%d=%s,%d", srv.ServiceHost, srv.ServicePort, changeName[change], repl.Mtime))
+		}
+	}
+	if bal.Dumper != nil {
+		bal.Dumper.Printf("%s have=%d want=%d %s", blkid, len(blk.Replicas), blk.Desired, strings.Join(changes, " "))
+	}
+type blocksNBytes struct {
+	replicas int
+	blocks   int
+	bytes    int64
+func (bb blocksNBytes) String() string {
+	return fmt.Sprintf("%d replicas (%d blocks, %d bytes)", bb.replicas, bb.blocks, bb.bytes)
+type balancerStats struct {
+	lost, overrep, unref, garbage, underrep, justright blocksNBytes
+	desired, current                                   blocksNBytes
+	pulls, trashes                                     int
+func (bal *Balancer) getStatistics() (s balancerStats) {
+	bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
+		surplus := len(blk.Replicas) - blk.Desired
+		bytes := blkid.Size()
+		switch {
+		case len(blk.Replicas) == 0 && blk.Desired > 0:
+			s.lost.replicas -= surplus
+			s.lost.blocks++
+			s.lost.bytes += bytes * int64(-surplus)
+		case len(blk.Replicas) < blk.Desired:
+			s.underrep.replicas -= surplus
+			s.underrep.blocks++
+			s.underrep.bytes += bytes * int64(-surplus)
+		case len(blk.Replicas) > 0 && blk.Desired == 0:
+			counter := &s.garbage
+			for _, r := range blk.Replicas {
+				if r.Mtime >= bal.MinMtime {
+					counter = &s.unref
+					break
+				}
+			}
+			counter.replicas += surplus
+			counter.blocks++
+			counter.bytes += bytes * int64(surplus)
+		case len(blk.Replicas) > blk.Desired:
+			s.overrep.replicas += surplus
+			s.overrep.blocks++
+			s.overrep.bytes += bytes * int64(len(blk.Replicas)-blk.Desired)
+		default:
+			s.justright.replicas += blk.Desired
+			s.justright.blocks++
+			s.justright.bytes += bytes * int64(blk.Desired)
+		}
+		if blk.Desired > 0 {
+			s.desired.replicas += blk.Desired
+			s.desired.blocks++
+			s.desired.bytes += bytes * int64(blk.Desired)
+		}
+		if len(blk.Replicas) > 0 {
+			s.current.replicas += len(blk.Replicas)
+			s.current.blocks++
+			s.current.bytes += bytes * int64(len(blk.Replicas))
+		}
+	})
+	for _, srv := range bal.KeepServices {
+		s.pulls += len(srv.ChangeSet.Pulls)
+		s.trashes += len(srv.ChangeSet.Trashes)
+	}
+	return
+// PrintStatistics writes statistics about the computed changes to
+// bal.Logger. It should not be called until ComputeChangeSets has
+// finished.
+func (bal *Balancer) PrintStatistics() {
+	s := bal.getStatistics()
+	bal.logf("===")
+	bal.logf("%s lost (0=have<want)", s.lost)
+	bal.logf("%s underreplicated (0<have<want)", s.underrep)
+	bal.logf("%s just right (have=want)", s.justright)
+	bal.logf("%s overreplicated (have>want>0)", s.overrep)
+	bal.logf("%s unreferenced (have>want=0, new)", s.unref)
+	bal.logf("%s garbage (have>want=0, old)", s.garbage)
+	bal.logf("===")
+	bal.logf("%s total commitment (excluding unreferenced)", s.desired)
+	bal.logf("%s total usage", s.current)
+	bal.logf("===")
+	for _, srv := range bal.KeepServices {
+		bal.logf("%s: %v\n", srv, srv.ChangeSet)
+	}
+	bal.logf("===")
+// CheckSanityLate checks for configuration and runtime errors after
+// GetCurrentState() and ComputeChangeSets() have finished.
+// If it returns an error, it is dangerous to run any Commit methods.
+func (bal *Balancer) CheckSanityLate() error {
+	if bal.errors != nil {
+		for _, err := range bal.errors {
+			bal.logf("deferred error: %v", err)
+		}
+		return fmt.Errorf("cannot proceed safely after deferred errors")
+	}
+	if bal.collScanned == 0 {
+		return fmt.Errorf("received zero collections")
+	}
+	anyDesired := false
+	bal.BlockStateMap.Apply(func(_ arvados.SizedDigest, blk *BlockState) {
+		if blk.Desired > 0 {
+			anyDesired = true
+		}
+	})
+	if !anyDesired {
+		return fmt.Errorf("zero blocks have desired replication>0")
+	}
+	if dr := bal.DefaultReplication; dr < 1 {
+		return fmt.Errorf("Default replication (%d) is less than 1", dr)
+	}
+	// TODO: no two services have identical indexes
+	// TODO: no collisions (same md5, different size)
+	return nil
+// CommitPulls sends the computed lists of pull requests to the
+// keepstore servers. This has the effect of increasing replication of
+// existing blocks that are either underreplicated or poorly
+// distributed according to rendezvous hashing.
+func (bal *Balancer) CommitPulls(c *arvados.Client) error {
+	return bal.commitAsync(c, "send pull list",
+		func(srv *KeepService) error {
+			return srv.CommitPulls(c)
+		})
+// CommitTrash sends the computed lists of trash requests to the
+// keepstore servers. This has the effect of deleting blocks that are
+// overreplicated or unreferenced.
+func (bal *Balancer) CommitTrash(c *arvados.Client) error {
+	return bal.commitAsync(c, "send trash list",
+		func(srv *KeepService) error {
+			return srv.CommitTrash(c)
+		})
+func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *KeepService) error) error {
+	errs := make(chan error)
+	for _, srv := range bal.KeepServices {
+		go func(srv *KeepService) {
+			var err error
+			defer func() { errs <- err }()
+			label := fmt.Sprintf("%s: %v", srv, label)
+			defer timeMe(bal.Logger, label)()
+			err = f(srv)
+			if err != nil {
+				err = fmt.Errorf("%s: %v", label, err)
+			}
+		}(srv)
+	}
+	var lastErr error
+	for _ = range bal.KeepServices {
+		if err := <-errs; err != nil {
+			bal.logf("%v", err)
+			lastErr = err
+		}
+	}
+	close(errs)
+	return lastErr
+func (bal *Balancer) logf(f string, args ...interface{}) {
+	if bal.Logger != nil {
+		bal.Logger.Printf(f, args...)
+	}
diff --git a/services/keep-balance/balance_run_test.go b/services/keep-balance/balance_run_test.go
new file mode 100644
index 0000000..a138d91
--- /dev/null
+++ b/services/keep-balance/balance_run_test.go
@@ -0,0 +1,374 @@
+package main
+import (
+	_ "encoding/json"
+	"fmt"
+	"io"
+	"io/ioutil"
+	"log"
+	"net/http"
+	"net/http/httptest"
+	"strings"
+	"sync"
+	"time"
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+	check "gopkg.in/check.v1"
+var _ = check.Suite(&runSuite{})
+type reqTracker struct {
+	reqs []http.Request
+	sync.Mutex
+func (rt *reqTracker) Count() int {
+	rt.Lock()
+	defer rt.Unlock()
+	return len(rt.reqs)
+func (rt *reqTracker) Add(req *http.Request) int {
+	rt.Lock()
+	defer rt.Unlock()
+	rt.reqs = append(rt.reqs, *req)
+	return len(rt.reqs)
+// stubServer is an HTTP transport that intercepts and processes all
+// requests using its own handlers.
+type stubServer struct {
+	mux      *http.ServeMux
+	srv      *httptest.Server
+	mutex    sync.Mutex
+	Requests reqTracker
+	logf     func(string, ...interface{})
+// Start initializes the stub server and returns an *http.Client that
+// uses the stub server to handle all requests.
+// A stubServer that has been started should eventually be shut down
+// with Close().
+func (s *stubServer) Start() *http.Client {
+	// Set up a config.Client that forwards all requests to s.mux
+	// via s.srv. Test cases will attach handlers to s.mux to get
+	// the desired responses.
+	s.mux = http.NewServeMux()
+	s.srv = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+		s.mutex.Lock()
+		s.Requests.Add(r)
+		s.mutex.Unlock()
+		w.Header().Set("Content-Type", "application/json")
+		s.mux.ServeHTTP(w, r)
+	}))
+	return &http.Client{Transport: s}
+func (s *stubServer) RoundTrip(req *http.Request) (*http.Response, error) {
+	w := httptest.NewRecorder()
+	s.mux.ServeHTTP(w, req)
+	return &http.Response{
+		StatusCode: w.Code,
+		Status:     fmt.Sprintf("%d %s", w.Code, http.StatusText(w.Code)),
+		Header:     w.HeaderMap,
+		Body:       ioutil.NopCloser(w.Body)}, nil
+// Close releases resources used by the server.
+func (s *stubServer) Close() {
+	s.srv.Close()
+func (s *stubServer) serveStatic(path, data string) *reqTracker {
+	rt := &reqTracker{}
+	s.mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
+		rt.Add(r)
+		if r.Body != nil {
+			ioutil.ReadAll(r.Body)
+			r.Body.Close()
+		}
+		io.WriteString(w, data)
+	})
+	return rt
+func (s *stubServer) serveCurrentUserAdmin() *reqTracker {
+	return s.serveStatic("/arvados/v1/users/current",
+		`{"uuid":"zzzzz-tpzed-000000000000000","is_admin":true,"is_active":true}`)
+func (s *stubServer) serveCurrentUserNotAdmin() *reqTracker {
+	return s.serveStatic("/arvados/v1/users/current",
+		`{"uuid":"zzzzz-tpzed-000000000000000","is_admin":false,"is_active":true}`)
+func (s *stubServer) serveDiscoveryDoc() *reqTracker {
+	return s.serveStatic("/discovery/v1/apis/arvados/v1/rest",
+		`{"defaultCollectionReplication":2}`)
+func (s *stubServer) serveZeroCollections() *reqTracker {
+	return s.serveStatic("/arvados/v1/collections",
+		`{"items":[],"items_available":0}`)
+func (s *stubServer) serveFooBarFileCollections() *reqTracker {
+	rt := &reqTracker{}
+	s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
+		r.ParseForm()
+		rt.Add(r)
+		if strings.Contains(r.Form.Get("filters"), `modified_at`) {
+			io.WriteString(w, `{"items_available":0,"items":[]}`)
+		} else {
+			io.WriteString(w, `{"items_available":2,"items":[
+				{"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
+				{"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
+		}
+	})
+	return rt
+func (s *stubServer) serveCollectionsButSkipOne() *reqTracker {
+	rt := &reqTracker{}
+	s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
+		r.ParseForm()
+		rt.Add(r)
+		if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003c="`) {
+			io.WriteString(w, `{"items_available":3,"items":[]}`)
+		} else if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003e="`) {
+			io.WriteString(w, `{"items_available":0,"items":[]}`)
+		} else {
+			io.WriteString(w, `{"items_available":2,"items":[
+				{"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
+				{"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
+		}
+	})
+	return rt
+func (s *stubServer) serveZeroKeepServices() *reqTracker {
+	return s.serveStatic("/arvados/v1/keep_services",
+		`{"items":[],"items_available":0}`)
+func (s *stubServer) serveFourDiskKeepServices() *reqTracker {
+	return s.serveStatic("/arvados/v1/keep_services", `{"items_available":5,"items":[
+		{"uuid":"zzzzz-bi6l4-000000000000000","service_host":"keep0.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
+		{"uuid":"zzzzz-bi6l4-000000000000001","service_host":"keep1.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
+		{"uuid":"zzzzz-bi6l4-000000000000002","service_host":"keep2.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
+		{"uuid":"zzzzz-bi6l4-000000000000003","service_host":"keep3.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
+		{"uuid":"zzzzz-bi6l4-h0a0xwut9qa6g3a","service_host":"keep.zzzzz.arvadosapi.com","service_port":25333,"service_ssl_flag":true,"service_type":"proxy"}]}`)
+func (s *stubServer) serveKeepstoreIndexFoo4Bar1() *reqTracker {
+	rt := &reqTracker{}
+	s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
+		count := rt.Add(r)
+		if r.Host == "keep0.zzzzz.arvadosapi.com:25107" {
+			io.WriteString(w, "37b51d194a7513e45b56f6524f2d51f2+3 12345678\n")
+		}
+		fmt.Fprintf(w, "acbd18db4cc2f85cedef654fccc4a4d8+3 %d\n\n", 12345678+count)
+	})
+	return rt
+func (s *stubServer) serveKeepstoreTrash() *reqTracker {
+	return s.serveStatic("/trash", `{}`)
+func (s *stubServer) serveKeepstorePull() *reqTracker {
+	return s.serveStatic("/pull", `{}`)
+type runSuite struct {
+	stub   stubServer
+	config Config
+// make a log.Logger that writes to the current test's c.Log().
+func (s *runSuite) logger(c *check.C) *log.Logger {
+	r, w := io.Pipe()
+	go func() {
+		buf := make([]byte, 10000)
+		for {
+			n, err := r.Read(buf)
+			if n > 0 {
+				if buf[n-1] == '\n' {
+					n--
+				}
+				c.Log(string(buf[:n]))
+			}
+			if err != nil {
+				break
+			}
+		}
+	}()
+	return log.New(w, "", log.LstdFlags)
+func (s *runSuite) SetUpTest(c *check.C) {
+	s.config = Config{
+		Client: arvados.Client{
+			AuthToken: "xyzzy",
+			APIHost:   "zzzzz.arvadosapi.com",
+			Client:    s.stub.Start()},
+		KeepServiceTypes: []string{"disk"}}
+	s.stub.serveDiscoveryDoc()
+	s.stub.logf = c.Logf
+func (s *runSuite) TearDownTest(c *check.C) {
+	s.stub.Close()
+func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
+	opts := RunOptions{
+		CommitPulls: true,
+		CommitTrash: true,
+		Logger:      s.logger(c),
+	}
+	s.stub.serveCurrentUserAdmin()
+	s.stub.serveZeroCollections()
+	s.stub.serveFourDiskKeepServices()
+	s.stub.serveKeepstoreIndexFoo4Bar1()
+	trashReqs := s.stub.serveKeepstoreTrash()
+	pullReqs := s.stub.serveKeepstorePull()
+	err := (&Balancer{}).Run(s.config, opts)
+	c.Check(err, check.ErrorMatches, "received zero collections")
+	c.Check(trashReqs.Count(), check.Equals, 4)
+	c.Check(pullReqs.Count(), check.Equals, 0)
+func (s *runSuite) TestServiceTypes(c *check.C) {
+	opts := RunOptions{
+		CommitPulls: true,
+		CommitTrash: true,
+		Logger:      s.logger(c),
+	}
+	s.config.KeepServiceTypes = []string{"unlisted-type"}
+	s.stub.serveCurrentUserAdmin()
+	s.stub.serveFooBarFileCollections()
+	s.stub.serveFourDiskKeepServices()
+	indexReqs := s.stub.serveKeepstoreIndexFoo4Bar1()
+	trashReqs := s.stub.serveKeepstoreTrash()
+	err := (&Balancer{}).Run(s.config, opts)
+	c.Check(err, check.IsNil)
+	c.Check(indexReqs.Count(), check.Equals, 0)
+	c.Check(trashReqs.Count(), check.Equals, 0)
+func (s *runSuite) TestRefuseNonAdmin(c *check.C) {
+	opts := RunOptions{
+		CommitPulls: true,
+		CommitTrash: true,
+		Logger:      s.logger(c),
+	}
+	s.stub.serveCurrentUserNotAdmin()
+	s.stub.serveZeroCollections()
+	s.stub.serveFourDiskKeepServices()
+	trashReqs := s.stub.serveKeepstoreTrash()
+	pullReqs := s.stub.serveKeepstorePull()
+	err := (&Balancer{}).Run(s.config, opts)
+	c.Check(err, check.ErrorMatches, "current user .* is not .* admin user")
+	c.Check(trashReqs.Count(), check.Equals, 0)
+	c.Check(pullReqs.Count(), check.Equals, 0)
+func (s *runSuite) TestDetectSkippedCollections(c *check.C) {
+	opts := RunOptions{
+		CommitPulls: true,
+		CommitTrash: true,
+		Logger:      s.logger(c),
+	}
+	s.stub.serveCurrentUserAdmin()
+	s.stub.serveCollectionsButSkipOne()
+	s.stub.serveFourDiskKeepServices()
+	s.stub.serveKeepstoreIndexFoo4Bar1()
+	trashReqs := s.stub.serveKeepstoreTrash()
+	pullReqs := s.stub.serveKeepstorePull()
+	err := (&Balancer{}).Run(s.config, opts)
+	c.Check(err, check.ErrorMatches, `Retrieved 2 collections with modtime <= .* but server now reports there are 3 collections.*`)
+	c.Check(trashReqs.Count(), check.Equals, 4)
+	c.Check(pullReqs.Count(), check.Equals, 0)
+func (s *runSuite) TestDryRun(c *check.C) {
+	opts := RunOptions{
+		CommitPulls: false,
+		CommitTrash: false,
+		Logger:      s.logger(c),
+	}
+	s.stub.serveCurrentUserAdmin()
+	s.stub.serveFooBarFileCollections()
+	s.stub.serveFourDiskKeepServices()
+	s.stub.serveKeepstoreIndexFoo4Bar1()
+	trashReqs := s.stub.serveKeepstoreTrash()
+	pullReqs := s.stub.serveKeepstorePull()
+	var bal Balancer
+	err := bal.Run(s.config, opts)
+	c.Check(err, check.IsNil)
+	c.Check(trashReqs.Count(), check.Equals, 0)
+	c.Check(pullReqs.Count(), check.Equals, 0)
+	stats := bal.getStatistics()
+	c.Check(stats.pulls, check.Not(check.Equals), 0)
+	c.Check(stats.underrep.replicas, check.Not(check.Equals), 0)
+	c.Check(stats.overrep.replicas, check.Not(check.Equals), 0)
+func (s *runSuite) TestCommit(c *check.C) {
+	opts := RunOptions{
+		CommitPulls: true,
+		CommitTrash: true,
+		Logger:      s.logger(c),
+		Dumper:      s.logger(c),
+	}
+	s.stub.serveCurrentUserAdmin()
+	s.stub.serveFooBarFileCollections()
+	s.stub.serveFourDiskKeepServices()
+	s.stub.serveKeepstoreIndexFoo4Bar1()
+	trashReqs := s.stub.serveKeepstoreTrash()
+	pullReqs := s.stub.serveKeepstorePull()
+	var bal Balancer
+	err := bal.Run(s.config, opts)
+	c.Check(err, check.IsNil)
+	c.Check(trashReqs.Count(), check.Equals, 8)
+	c.Check(pullReqs.Count(), check.Equals, 4)
+	stats := bal.getStatistics()
+	// "foo" block is overreplicated by 2
+	c.Check(stats.trashes, check.Equals, 2)
+	// "bar" block is underreplicated by 1, and its only copy is
+	// in a poor rendezvous position
+	c.Check(stats.pulls, check.Equals, 2)
+func (s *runSuite) TestRunForever(c *check.C) {
+	opts := RunOptions{
+		CommitPulls: true,
+		CommitTrash: true,
+		Logger:      s.logger(c),
+		Dumper:      s.logger(c),
+	}
+	s.stub.serveCurrentUserAdmin()
+	s.stub.serveFooBarFileCollections()
+	s.stub.serveFourDiskKeepServices()
+	s.stub.serveKeepstoreIndexFoo4Bar1()
+	trashReqs := s.stub.serveKeepstoreTrash()
+	pullReqs := s.stub.serveKeepstorePull()
+	stop := make(chan interface{})
+	s.config.RunPeriod = arvados.Duration(time.Millisecond)
+	go RunForever(s.config, opts, stop)
+	// Each run should send 4 clear trash lists + 4 pull lists + 4
+	// trash lists. We should complete four runs in much less than
+	// a second.
+	for t0 := time.Now(); pullReqs.Count() < 16 && time.Since(t0) < 10*time.Second; {
+		time.Sleep(time.Millisecond)
+	}
+	stop <- true
+	c.Check(pullReqs.Count() >= 16, check.Equals, true)
+	c.Check(trashReqs.Count(), check.Equals, 2*pullReqs.Count())
diff --git a/services/keep-balance/balance_test.go b/services/keep-balance/balance_test.go
new file mode 100644
index 0000000..682a5fb
--- /dev/null
+++ b/services/keep-balance/balance_test.go
@@ -0,0 +1,255 @@
+package main
+import (
+	"crypto/md5"
+	"fmt"
+	"sort"
+	"strconv"
+	"testing"
+	"time"
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+	check "gopkg.in/check.v1"
+// Test with Gocheck
+func Test(t *testing.T) {
+	check.TestingT(t)
+var _ = check.Suite(&balancerSuite{})
+type balancerSuite struct {
+	Balancer
+	srvs            []*KeepService
+	blks            map[string]tester
+	knownRendezvous [][]int
+	signatureTTL    int64
+const (
+	// index into knownRendezvous
+	known0 = 0
+type slots []int
+type tester struct {
+	known       int
+	desired     int
+	current     slots
+	timestamps  []int64
+	shouldPull  slots
+	shouldTrash slots
+func (bal *balancerSuite) SetUpSuite(c *check.C) {
+	bal.knownRendezvous = nil
+	for _, str := range []string{
+		"3eab2d5fc9681074",
+		"097dba52e648f1c3",
+		"c5b4e023f8a7d691",
+		"9d81c02e76a3bf54",
+	} {
+		var slots []int
+		for _, c := range []byte(str) {
+			pos, _ := strconv.ParseUint(string(c), 16, 4)
+			slots = append(slots, int(pos))
+		}
+		bal.knownRendezvous = append(bal.knownRendezvous, slots)
+	}
+	bal.signatureTTL = 3600
+func (bal *balancerSuite) SetUpTest(c *check.C) {
+	bal.srvs = make([]*KeepService, 16)
+	bal.KeepServices = make(map[string]*KeepService)
+	for i := range bal.srvs {
+		srv := &KeepService{
+			KeepService: arvados.KeepService{
+				UUID: fmt.Sprintf("zzzzz-bi6l4-%015x", i),
+			},
+		}
+		bal.srvs[i] = srv
+		bal.KeepServices[srv.UUID] = srv
+	}
+	bal.MinMtime = time.Now().Unix() - bal.signatureTTL
+func (bal *balancerSuite) TestPerfect(c *check.C) {
+	bal.try(c, tester{
+		desired:     2,
+		current:     slots{0, 1},
+		shouldPull:  nil,
+		shouldTrash: nil})
+func (bal *balancerSuite) TestDecreaseRepl(c *check.C) {
+	bal.try(c, tester{
+		desired:     2,
+		current:     slots{0, 2, 1},
+		shouldTrash: slots{2}})
+func (bal *balancerSuite) TestDecreaseReplToZero(c *check.C) {
+	bal.try(c, tester{
+		desired:     0,
+		current:     slots{0, 1, 3},
+		shouldTrash: slots{0, 1, 3}})
+func (bal *balancerSuite) TestIncreaseRepl(c *check.C) {
+	bal.try(c, tester{
+		desired:    4,
+		current:    slots{0, 1},
+		shouldPull: slots{2, 3}})
+func (bal *balancerSuite) TestSkipReadonly(c *check.C) {
+	bal.srvList(0, slots{3})[0].ReadOnly = true
+	bal.try(c, tester{
+		desired:    4,
+		current:    slots{0, 1},
+		shouldPull: slots{2, 4}})
+func (bal *balancerSuite) TestFixUnbalanced(c *check.C) {
+	bal.try(c, tester{
+		desired:    2,
+		current:    slots{2, 0},
+		shouldPull: slots{1}})
+	bal.try(c, tester{
+		desired:    2,
+		current:    slots{2, 7},
+		shouldPull: slots{0, 1}})
+	// if only one of the pulls succeeds, we'll see this next:
+	bal.try(c, tester{
+		desired:     2,
+		current:     slots{2, 1, 7},
+		shouldPull:  slots{0},
+		shouldTrash: slots{7}})
+	// if both pulls succeed, we'll see this next:
+	bal.try(c, tester{
+		desired:     2,
+		current:     slots{2, 0, 1, 7},
+		shouldTrash: slots{2, 7}})
+	// unbalanced + excessive replication => pull + trash
+	bal.try(c, tester{
+		desired:     2,
+		current:     slots{2, 5, 7},
+		shouldPull:  slots{0, 1},
+		shouldTrash: slots{7}})
+func (bal *balancerSuite) TestIncreaseReplTimestampCollision(c *check.C) {
+	// For purposes of increasing replication, we assume identical
+	// replicas are distinct.
+	bal.try(c, tester{
+		desired:    4,
+		current:    slots{0, 1},
+		timestamps: []int64{12345678, 12345678},
+		shouldPull: slots{2, 3}})
+func (bal *balancerSuite) TestDecreaseReplTimestampCollision(c *check.C) {
+	// For purposes of decreasing replication, we assume identical
+	// replicas are NOT distinct.
+	bal.try(c, tester{
+		desired:    2,
+		current:    slots{0, 1, 2},
+		timestamps: []int64{12345678, 12345678, 12345678}})
+	bal.try(c, tester{
+		desired:    2,
+		current:    slots{0, 1, 2},
+		timestamps: []int64{12345678, 10000000, 10000000}})
+func (bal *balancerSuite) TestDecreaseReplBlockTooNew(c *check.C) {
+	oldTime := bal.MinMtime - 3600
+	newTime := bal.MinMtime + 3600
+	// The excess replica is too new to delete.
+	bal.try(c, tester{
+		desired:    2,
+		current:    slots{0, 1, 2},
+		timestamps: []int64{oldTime, newTime, newTime + 1}})
+	// The best replicas are too new to delete, but the excess
+	// replica is old enough.
+	bal.try(c, tester{
+		desired:     2,
+		current:     slots{0, 1, 2},
+		timestamps:  []int64{newTime, newTime + 1, oldTime},
+		shouldTrash: slots{2}})
+// Clear all servers' changesets, balance a single block, and verify
+// the appropriate changes for that block have been added to the
+// changesets.
+func (bal *balancerSuite) try(c *check.C, t tester) {
+	bal.setupServiceRoots()
+	blk := &BlockState{
+		Desired:  t.desired,
+		Replicas: bal.replList(t.known, t.current)}
+	for i, t := range t.timestamps {
+		blk.Replicas[i].Mtime = t
+	}
+	for _, srv := range bal.srvs {
+		srv.ChangeSet = &ChangeSet{}
+	}
+	bal.balanceBlock(knownBlkid(t.known), blk)
+	var didPull, didTrash slots
+	for i, srv := range bal.srvs {
+		var slot int
+		for probeOrder, srvNum := range bal.knownRendezvous[t.known] {
+			if srvNum == i {
+				slot = probeOrder
+			}
+		}
+		for _, pull := range srv.Pulls {
+			didPull = append(didPull, slot)
+			c.Check(pull.SizedDigest, check.Equals, knownBlkid(t.known))
+		}
+		for _, trash := range srv.Trashes {
+			didTrash = append(didTrash, slot)
+			c.Check(trash.SizedDigest, check.Equals, knownBlkid(t.known))
+		}
+	}
+	for _, list := range []slots{didPull, didTrash, t.shouldPull, t.shouldTrash} {
+		sort.Sort(sort.IntSlice(list))
+	}
+	c.Check(didPull, check.DeepEquals, t.shouldPull)
+	c.Check(didTrash, check.DeepEquals, t.shouldTrash)
+// srvList returns the KeepServices, sorted in rendezvous order and
+// then selected by idx. For example, srvList(3, 0, 1, 4) returns the
+// the first-, second-, and fifth-best servers for storing
+// bal.knownBlkid(3).
+func (bal *balancerSuite) srvList(knownBlockID int, order slots) (srvs []*KeepService) {
+	for _, i := range order {
+		srvs = append(srvs, bal.srvs[bal.knownRendezvous[knownBlockID][i]])
+	}
+	return
+// replList is like srvList but returns an "existing replicas" slice,
+// suitable for a BlockState test fixture.
+func (bal *balancerSuite) replList(knownBlockID int, order slots) (repls []Replica) {
+	mtime := time.Now().Unix() - bal.signatureTTL - 86400
+	for _, srv := range bal.srvList(knownBlockID, order) {
+		repls = append(repls, Replica{srv, mtime})
+		mtime++
+	}
+	return
+// generate the same data hashes that are tested in
+// sdk/go/keepclient/root_sorter_test.go
+func knownBlkid(i int) arvados.SizedDigest {
+	return arvados.SizedDigest(fmt.Sprintf("%x+64", md5.Sum([]byte(fmt.Sprintf("%064x", i)))))
diff --git a/services/keep-balance/block_state.go b/services/keep-balance/block_state.go
new file mode 100644
index 0000000..d607386
--- /dev/null
+++ b/services/keep-balance/block_state.go
@@ -0,0 +1,95 @@
+package main
+import (
+	"sync"
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+// Replica is a file on disk (or object in an S3 bucket, or blob in an
+// Azure storage container, etc.) as reported in a keepstore index
+// response.
+type Replica struct {
+	*KeepService
+	Mtime int64
+// BlockState indicates the number of desired replicas (according to
+// the collections we know about) and the replicas actually stored
+// (according to the keepstore indexes we know about).
+type BlockState struct {
+	Replicas []Replica
+	Desired  int
+func (bs *BlockState) addReplica(r Replica) {
+	bs.Replicas = append(bs.Replicas, r)
+func (bs *BlockState) increaseDesired(n int) {
+	if bs.Desired < n {
+		bs.Desired = n
+	}
+// BlockStateMap is a goroutine-safe wrapper around a
+// map[arvados.SizedDigest]*BlockState.
+type BlockStateMap struct {
+	entries map[arvados.SizedDigest]*BlockState
+	mutex   sync.Mutex
+// NewBlockStateMap returns a newly allocated BlockStateMap.
+func NewBlockStateMap() *BlockStateMap {
+	return &BlockStateMap{
+		entries: make(map[arvados.SizedDigest]*BlockState),
+	}
+// return a BlockState entry, allocating a new one if needed. (Private
+// method: not goroutine-safe.)
+func (bsm *BlockStateMap) get(blkid arvados.SizedDigest) *BlockState {
+	// TODO? Allocate BlockState structs a slice at a time,
+	// instead of one at a time.
+	blk := bsm.entries[blkid]
+	if blk == nil {
+		blk = &BlockState{}
+		bsm.entries[blkid] = blk
+	}
+	return blk
+// Apply runs f on each entry in the map.
+func (bsm *BlockStateMap) Apply(f func(arvados.SizedDigest, *BlockState)) {
+	bsm.mutex.Lock()
+	defer bsm.mutex.Unlock()
+	for blkid, blk := range bsm.entries {
+		f(blkid, blk)
+	}
+// AddReplicas updates the map to indicate srv has a replica of each
+// block in idx.
+func (bsm *BlockStateMap) AddReplicas(srv *KeepService, idx []arvados.KeepServiceIndexEntry) {
+	bsm.mutex.Lock()
+	defer bsm.mutex.Unlock()
+	for _, ent := range idx {
+		bsm.get(ent.SizedDigest).addReplica(Replica{
+			KeepService: srv,
+			Mtime:       ent.Mtime,
+		})
+	}
+// IncreaseDesired updates the map to indicate the desired replication
+// for the given blocks is at least n.
+func (bsm *BlockStateMap) IncreaseDesired(n int, blocks []arvados.SizedDigest) {
+	bsm.mutex.Lock()
+	defer bsm.mutex.Unlock()
+	for _, blkid := range blocks {
+		bsm.get(blkid).increaseDesired(n)
+	}
diff --git a/services/keep-balance/change_set.go b/services/keep-balance/change_set.go
new file mode 100644
index 0000000..417ea7f
--- /dev/null
+++ b/services/keep-balance/change_set.go
@@ -0,0 +1,75 @@
+package main
+import (
+	"encoding/json"
+	"fmt"
+	"sync"
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+// Pull is a request to retrieve a block from a remote server, and
+// store it locally.
+type Pull struct {
+	arvados.SizedDigest
+	Source *KeepService
+// MarshalJSON formats a pull request the way keepstore wants to see
+// it.
+func (p Pull) MarshalJSON() ([]byte, error) {
+	type KeepstorePullRequest struct {
+		Locator string   `json:"locator"`
+		Servers []string `json:"servers"`
+	}
+	return json.Marshal(KeepstorePullRequest{
+		Locator: string(p.SizedDigest[:32]),
+		Servers: []string{p.Source.URLBase()}})
+// Trash is a request to delete a block.
+type Trash struct {
+	arvados.SizedDigest
+	Mtime int64
+// MarshalJSON formats a trash request the way keepstore wants to see
+// it, i.e., as a bare locator with no +size hint.
+func (t Trash) MarshalJSON() ([]byte, error) {
+	type KeepstoreTrashRequest struct {
+		Locator    string `json:"locator"`
+		BlockMtime int64  `json:"block_mtime"`
+	}
+	return json.Marshal(KeepstoreTrashRequest{
+		Locator:    string(t.SizedDigest[:32]),
+		BlockMtime: t.Mtime})
+// ChangeSet is a set of change requests that will be sent to a
+// keepstore server.
+type ChangeSet struct {
+	Pulls   []Pull
+	Trashes []Trash
+	mutex   sync.Mutex
+// AddPull adds a Pull operation.
+func (cs *ChangeSet) AddPull(p Pull) {
+	cs.mutex.Lock()
+	cs.Pulls = append(cs.Pulls, p)
+	cs.mutex.Unlock()
+// AddTrash adds a Trash operation
+func (cs *ChangeSet) AddTrash(t Trash) {
+	cs.mutex.Lock()
+	cs.Trashes = append(cs.Trashes, t)
+	cs.mutex.Unlock()
+// String implements fmt.Stringer.
+func (cs *ChangeSet) String() string {
+	cs.mutex.Lock()
+	defer cs.mutex.Unlock()
+	return fmt.Sprintf("ChangeSet{Pulls:%d, Trashes:%d}", len(cs.Pulls), len(cs.Trashes))
diff --git a/services/keep-balance/change_set_test.go b/services/keep-balance/change_set_test.go
new file mode 100644
index 0000000..b5dcb5c
--- /dev/null
+++ b/services/keep-balance/change_set_test.go
@@ -0,0 +1,35 @@
+package main
+import (
+	"encoding/json"
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+	check "gopkg.in/check.v1"
+var _ = check.Suite(&changeSetSuite{})
+type changeSetSuite struct{}
+func (s *changeSetSuite) TestJSONFormat(c *check.C) {
+	srv := &KeepService{
+		KeepService: arvados.KeepService{
+			UUID:           "zzzzz-bi6l4-000000000000001",
+			ServiceType:    "disk",
+			ServiceSSLFlag: false,
+			ServiceHost:    "keep1.zzzzz.arvadosapi.com",
+			ServicePort:    25107}}
+	buf, err := json.Marshal([]Pull{{
+		SizedDigest: arvados.SizedDigest("acbd18db4cc2f85cedef654fccc4a4d8+3"),
+		Source:      srv}})
+	c.Check(err, check.IsNil)
+	c.Check(string(buf), check.Equals, `[{"locator":"acbd18db4cc2f85cedef654fccc4a4d8","servers":["http://keep1.zzzzz.arvadosapi.com:25107"]}]`)
+	buf, err = json.Marshal([]Trash{{
+		SizedDigest: arvados.SizedDigest("acbd18db4cc2f85cedef654fccc4a4d8+3"),
+		Mtime:       123456789}})
+	c.Check(err, check.IsNil)
+	c.Check(string(buf), check.Equals, `[{"locator":"acbd18db4cc2f85cedef654fccc4a4d8","block_mtime":123456789}]`)
diff --git a/services/keep-balance/collection.go b/services/keep-balance/collection.go
new file mode 100644
index 0000000..e6a1f08
--- /dev/null
+++ b/services/keep-balance/collection.go
@@ -0,0 +1,95 @@
+package main
+import (
+	"fmt"
+	"time"
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+func countCollections(c *arvados.Client, params arvados.ResourceListParams) (int, error) {
+	var page arvados.CollectionList
+	var zero int
+	params.Limit = &zero
+	err := c.RequestAndDecode(&page, "GET", "arvados/v1/collections", nil, params)
+	return page.ItemsAvailable, err
+// EachCollection calls f once for every readable
+// collection. EachCollection stops if it encounters an error, such as
+// f returning a non-nil error.
+// The progress function is called periodically with done (number of
+// times f has been called) and total (number of times f is expected
+// to be called).
+func EachCollection(c *arvados.Client, f func(arvados.Collection) error, progress func(done, total int)) error {
+	if progress == nil {
+		progress = func(_, _ int) {}
+	}
+	expectCount, err := countCollections(c, arvados.ResourceListParams{})
+	if err != nil {
+		return err
+	}
+	limit := 1000
+	params := arvados.ResourceListParams{
+		Limit:  &limit,
+		Order:  "modified_at, uuid",
+		Select: []string{"uuid", "manifest_text", "modified_at", "portable_data_hash", "replication_desired"},
+	}
+	var last arvados.Collection
+	var filterTime time.Time
+	callCount := 0
+	for {
+		progress(callCount, expectCount)
+		var page arvados.CollectionList
+		err := c.RequestAndDecode(&page, "GET", "arvados/v1/collections", nil, params)
+		if err != nil {
+			return err
+		}
+		for _, coll := range page.Items {
+			if last.ModifiedAt != nil && *last.ModifiedAt == *coll.ModifiedAt && last.UUID >= coll.UUID {
+				continue
+			}
+			callCount++
+			err = f(coll)
+			if err != nil {
+				return err
+			}
+			last = coll
+		}
+		if last.ModifiedAt == nil || *last.ModifiedAt == filterTime {
+			if page.ItemsAvailable > len(page.Items) {
+				// TODO: use "mtime=X && UUID>Y"
+				// filters to get all collections with
+				// this timestamp, then use "mtime>X"
+				// to get the next timestamp.
+				return fmt.Errorf("BUG: Received an entire page with the same modified_at timestamp (%v), cannot make progress", filterTime)
+			}
+			break
+		}
+		filterTime = *last.ModifiedAt
+		params.Filters = []arvados.Filter{{
+			Attr:     "modified_at",
+			Operator: ">=",
+			Operand:  filterTime,
+		}, {
+			Attr:     "uuid",
+			Operator: "!=",
+			Operand:  last.UUID,
+		}}
+	}
+	progress(callCount, expectCount)
+	if checkCount, err := countCollections(c, arvados.ResourceListParams{Filters: []arvados.Filter{{
+		Attr:     "modified_at",
+		Operator: "<=",
+		Operand:  filterTime}}}); err != nil {
+		return err
+	} else if callCount < checkCount {
+		return fmt.Errorf("Retrieved %d collections with modtime <= T=%q, but server now reports there are %d collections with modtime <= T", callCount, filterTime, checkCount)
+	}
+	return nil
diff --git a/services/keep-balance/integration_test.go b/services/keep-balance/integration_test.go
new file mode 100644
index 0000000..b090614
--- /dev/null
+++ b/services/keep-balance/integration_test.go
@@ -0,0 +1,92 @@
+package main
+import (
+	"bytes"
+	"log"
+	"net/http"
+	"os"
+	"strings"
+	"testing"
+	"time"
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+	"git.curoverse.com/arvados.git/sdk/go/arvadostest"
+	"git.curoverse.com/arvados.git/sdk/go/keepclient"
+	check "gopkg.in/check.v1"
+var _ = check.Suite(&integrationSuite{})
+type integrationSuite struct {
+	config     Config
+	keepClient *keepclient.KeepClient
+func (s *integrationSuite) SetUpSuite(c *check.C) {
+	if testing.Short() {
+		c.Skip("-short")
+	}
+	arvadostest.ResetEnv()
+	arvadostest.StartAPI()
+	arvadostest.StartKeep(4, true)
+	arv, err := arvadosclient.MakeArvadosClient()
+	arv.ApiToken = arvadostest.DataManagerToken
+	c.Assert(err, check.IsNil)
+	s.keepClient = &keepclient.KeepClient{
+		Arvados: &arv,
+		Client:  &http.Client{},
+	}
+	c.Assert(s.keepClient.DiscoverKeepServers(), check.IsNil)
+	s.putReplicas(c, "foo", 4)
+	s.putReplicas(c, "bar", 1)
+func (s *integrationSuite) putReplicas(c *check.C, data string, replicas int) {
+	s.keepClient.Want_replicas = replicas
+	_, _, err := s.keepClient.PutB([]byte(data))
+	c.Assert(err, check.IsNil)
+func (s *integrationSuite) TearDownSuite(c *check.C) {
+	if testing.Short() {
+		c.Skip("-short")
+	}
+	arvadostest.StopKeep(4)
+	arvadostest.StopAPI()
+func (s *integrationSuite) SetUpTest(c *check.C) {
+	s.config = Config{
+		Client: arvados.Client{
+			APIHost:   os.Getenv("ARVADOS_API_HOST"),
+			AuthToken: arvadostest.DataManagerToken,
+			Insecure:  true,
+		},
+		KeepServiceTypes: []string{"disk"},
+	}
+func (s *integrationSuite) TestBalanceAPIFixtures(c *check.C) {
+	var logBuf *bytes.Buffer
+	for iter := 0; iter < 20; iter++ {
+		logBuf := &bytes.Buffer{}
+		opts := RunOptions{
+			CommitPulls: true,
+			CommitTrash: true,
+			Logger:      log.New(logBuf, "", log.LstdFlags),
+		}
+		err := (&Balancer{}).Run(s.config, opts)
+		c.Check(err, check.IsNil)
+		if iter == 0 {
+			c.Check(logBuf.String(), check.Matches, `(?ms).*ChangeSet{Pulls:1.*`)
+			c.Check(logBuf.String(), check.Not(check.Matches), `(?ms).*ChangeSet{.*Trashes:[^0]}*`)
+		} else if strings.Contains(logBuf.String(), "ChangeSet{Pulls:0") {
+			break
+		}
+		time.Sleep(200 * time.Millisecond)
+	}
+	c.Check(logBuf.String(), check.Not(check.Matches), `(?ms).*0 replicas (0 blocks, 0 bytes) underreplicated.*`)
diff --git a/services/keep-balance/keep_service.go b/services/keep-balance/keep_service.go
new file mode 100644
index 0000000..f65355d
--- /dev/null
+++ b/services/keep-balance/keep_service.go
@@ -0,0 +1,76 @@
+package main
+import (
+	"encoding/json"
+	"fmt"
+	"io"
+	"io/ioutil"
+	"net/http"
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+// KeepService represents a keepstore server that is being rebalanced.
+type KeepService struct {
+	arvados.KeepService
+	*ChangeSet
+// String implements fmt.Stringer.
+func (srv *KeepService) String() string {
+	return fmt.Sprintf("%s (%s:%d, %s)", srv.UUID, srv.ServiceHost, srv.ServicePort, srv.ServiceType)
+var ksSchemes = map[bool]string{false: "http", true: "https"}
+// URLBase returns scheme://host:port for this server.
+func (srv *KeepService) URLBase() string {
+	return fmt.Sprintf("%s://%s:%d", ksSchemes[srv.ServiceSSLFlag], srv.ServiceHost, srv.ServicePort)
+// CommitPulls sends the current list of pull requests to the storage
+// server (even if the list is empty).
+func (srv *KeepService) CommitPulls(c *arvados.Client) error {
+	return srv.put(c, "pull", srv.ChangeSet.Pulls)
+// CommitTrash sends the current list of trash requests to the storage
+// server (even if the list is empty).
+func (srv *KeepService) CommitTrash(c *arvados.Client) error {
+	return srv.put(c, "trash", srv.ChangeSet.Trashes)
+// Perform a PUT request at path, with data (as JSON) in the request
+// body.
+func (srv *KeepService) put(c *arvados.Client, path string, data interface{}) error {
+	// We'll start a goroutine to do the JSON encoding, so we can
+	// stream it to the http client through a Pipe, rather than
+	// keeping the entire encoded version in memory.
+	jsonR, jsonW := io.Pipe()
+	// errC communicates any encoding errors back to our main
+	// goroutine.
+	errC := make(chan error, 1)
+	go func() {
+		enc := json.NewEncoder(jsonW)
+		errC <- enc.Encode(data)
+		jsonW.Close()
+	}()
+	url := srv.URLBase() + "/" + path
+	req, err := http.NewRequest("PUT", url, ioutil.NopCloser(jsonR))
+	if err != nil {
+		return fmt.Errorf("building request for %s: %v", url, err)
+	}
+	err = c.DoAndDecode(nil, req)
+	// If there was an error encoding the request body, report
+	// that instead of the response: obviously we won't get a
+	// useful response if our request wasn't properly encoded.
+	if encErr := <-errC; encErr != nil {
+		return fmt.Errorf("encoding data for %s: %v", url, encErr)
+	}
+	return err
diff --git a/services/keep-balance/main.go b/services/keep-balance/main.go
new file mode 100644
index 0000000..42a8d63
--- /dev/null
+++ b/services/keep-balance/main.go
@@ -0,0 +1,156 @@
+package main
+import (
+	"encoding/json"
+	"flag"
+	"io/ioutil"
+	"log"
+	"os"
+	"os/signal"
+	"syscall"
+	"time"
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+// Config specifies site configuration, like API credentials and the
+// choice of which servers are to be balanced.
+// Config is loaded from a JSON config file (see usage()).
+type Config struct {
+	// Arvados API endpoint and credentials.
+	Client arvados.Client
+	// List of service types (e.g., "disk") to balance.
+	KeepServiceTypes []string
+	KeepServiceList arvados.KeepServiceList
+	// How often to check
+	RunPeriod arvados.Duration
+// RunOptions controls runtime behavior. The flags/options that belong
+// here are the ones that are useful for interactive use. For example,
+// "CommitTrash" is a runtime option rather than a config item because
+// it invokes a troubleshooting feature rather than expressing how
+// balancing is meant to be done at a given site.
+// RunOptions fields are controlled by command line flags.
+type RunOptions struct {
+	Once        bool
+	CommitPulls bool
+	CommitTrash bool
+	Logger      *log.Logger
+	Dumper      *log.Logger
+var debugf = func(string, ...interface{}) {}
+func main() {
+	var config Config
+	var runOptions RunOptions
+	configPath := flag.String("config", "",
+		"`path` of json configuration file")
+	serviceListPath := flag.String("config.KeepServiceList", "",
+		"`path` of json file with list of keep services to balance, as given by \"arv keep_service list\" "+
+			"(default: config[\"KeepServiceList\"], or if none given, get all available services and filter by config[\"KeepServiceTypes\"])")
+	flag.BoolVar(&runOptions.Once, "once", false,
+		"balance once and then exit")
+	flag.BoolVar(&runOptions.CommitPulls, "commit-pulls", false,
+		"send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)")
+	flag.BoolVar(&runOptions.CommitTrash, "commit-trash", false,
+		"send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)")
+	dumpFlag := flag.Bool("dump", false, "dump details for each block to stdout")
+	debugFlag := flag.Bool("debug", false, "enable debug messages")
+	flag.Usage = usage
+	flag.Parse()
+	if *configPath == "" {
+		log.Fatal("You must specify a config file (see `keep-balance -help`)")
+	}
+	mustReadJSON(&config, *configPath)
+	if *serviceListPath != "" {
+		mustReadJSON(&config.KeepServiceList, *serviceListPath)
+	}
+	if *debugFlag {
+		debugf = log.Printf
+		if j, err := json.Marshal(config); err != nil {
+			log.Fatal(err)
+		} else {
+			log.Printf("config is %s", j)
+		}
+	}
+	if *dumpFlag {
+		runOptions.Dumper = log.New(os.Stdout, "", log.LstdFlags)
+	}
+	err := CheckConfig(config, runOptions)
+	if err != nil {
+		// (don't run)
+	} else if runOptions.Once {
+		err = (&Balancer{}).Run(config, runOptions)
+	} else {
+		err = RunForever(config, runOptions, nil)
+	}
+	if err != nil {
+		log.Fatal(err)
+	}
+func mustReadJSON(dst interface{}, path string) {
+	if buf, err := ioutil.ReadFile(path); err != nil {
+		log.Fatalf("Reading %q: %v", path, err)
+	} else if err = json.Unmarshal(buf, dst); err != nil {
+		log.Fatalf("Decoding %q: %v", path, err)
+	}
+// RunForever runs forever, or (for testing purposes) until the given
+// stop channel is ready to receive.
+func RunForever(config Config, runOptions RunOptions, stop <-chan interface{}) error {
+	if runOptions.Logger == nil {
+		runOptions.Logger = log.New(os.Stderr, "", log.LstdFlags)
+	}
+	logger := runOptions.Logger
+	ticker := time.NewTicker(time.Duration(config.RunPeriod))
+	// The unbuffered channel here means we only hear SIGUSR1 if
+	// it arrives while we're waiting in select{}.
+	sigUSR1 := make(chan os.Signal)
+	signal.Notify(sigUSR1, syscall.SIGUSR1)
+	logger.Printf("starting up: will scan every %v and on SIGUSR1", config.RunPeriod)
+	for {
+		if !runOptions.CommitPulls && !runOptions.CommitTrash {
+			logger.Print("WARNING: Will scan periodically, but no changes will be committed.")
+			logger.Print("=======  Consider using -commit-pulls and -commit-trash flags.")
+		}
+		err := (&Balancer{}).Run(config, runOptions)
+		if err != nil {
+			logger.Print("run failed: ", err)
+		} else {
+			logger.Print("run succeeded")
+		}
+		select {
+		case <-stop:
+			signal.Stop(sigUSR1)
+			return nil
+		case <-ticker.C:
+			logger.Print("timer went off")
+		case <-sigUSR1:
+			logger.Print("received SIGUSR1, resetting timer")
+			// Reset the timer so we don't start the N+1st
+			// run too soon after the Nth run is triggered
+			// by SIGUSR1.
+			ticker.Stop()
+			ticker = time.NewTicker(time.Duration(config.RunPeriod))
+		}
+		logger.Print("starting next run")
+	}
diff --git a/services/keep-balance/main_test.go b/services/keep-balance/main_test.go
new file mode 100644
index 0000000..4a56098
--- /dev/null
+++ b/services/keep-balance/main_test.go
@@ -0,0 +1,43 @@
+package main
+import (
+	"encoding/json"
+	"time"
+	check "gopkg.in/check.v1"
+var _ = check.Suite(&mainSuite{})
+type mainSuite struct{}
+func (s *mainSuite) TestExampleJSON(c *check.C) {
+	var config Config
+	c.Check(json.Unmarshal(exampleConfigFile, &config), check.IsNil)
+	c.Check(config.KeepServiceTypes, check.DeepEquals, []string{"disk"})
+	c.Check(config.Client.AuthToken, check.Equals, "xyzzy")
+	c.Check(time.Duration(config.RunPeriod), check.Equals, 600*time.Second)
+func (s *mainSuite) TestConfigJSONWithKeepServiceList(c *check.C) {
+	var config Config
+	c.Check(json.Unmarshal([]byte(`
+		{
+		    "Client": {
+			"APIHost": "zzzzz.arvadosapi.com:443",
+			"AuthToken": "xyzzy",
+			"Insecure": false
+		    },
+		    "KeepServiceList": {
+			"items": [
+			    {"uuid":"zzzzz-bi64l-abcdefghijklmno", "service_type":"disk", "service_host":"a.zzzzz.arvadosapi.com", "service_port":12345},
+			    {"uuid":"zzzzz-bi64l-bcdefghijklmnop", "service_type":"blob", "service_host":"b.zzzzz.arvadosapi.com", "service_port":12345}
+			]
+		    },
+		    "RunPeriod": "600s"
+		}`), &config), check.IsNil)
+	c.Assert(len(config.KeepServiceList.Items), check.Equals, 2)
+	c.Check(config.KeepServiceList.Items[0].UUID, check.Equals, "zzzzz-bi64l-abcdefghijklmno")
+	c.Check(config.KeepServiceList.Items[0].ServicePort, check.Equals, 12345)
+	c.Check(config.Client.AuthToken, check.Equals, "xyzzy")
diff --git a/services/keep-balance/time_me.go b/services/keep-balance/time_me.go
new file mode 100644
index 0000000..e5f16b7
--- /dev/null
+++ b/services/keep-balance/time_me.go
@@ -0,0 +1,14 @@
+package main
+import (
+	"log"
+	"time"
+func timeMe(logger *log.Logger, label string) func() {
+	t0 := time.Now()
+	logger.Printf("%s: start", label)
+	return func() {
+		logger.Printf("%s: took %v", label, time.Since(t0))
+	}
diff --git a/services/keep-balance/usage.go b/services/keep-balance/usage.go
new file mode 100644
index 0000000..eb9990c
--- /dev/null
+++ b/services/keep-balance/usage.go
@@ -0,0 +1,83 @@
+package main
+import (
+	"flag"
+	"fmt"
+	"os"
+var exampleConfigFile = []byte(`
+    {
+	"Client": {
+	    "APIHost": "zzzzz.arvadosapi.com:443",
+	    "AuthToken": "xyzzy",
+	    "Insecure": false
+	},
+	"KeepServiceTypes": [
+	    "disk"
+	],
+	"RunPeriod": "600s"
+    }`)
+func usage() {
+	fmt.Fprintf(os.Stderr, `
+keep-balance rebalances a set of keepstore servers. It creates new
+copies of underreplicated blocks, deletes excess copies of
+overreplicated and unreferenced blocks, and moves blocks to better
+positions (according to the rendezvous hash algorithm) so clients find
+them faster.
+Usage: keep-balance -config path/to/config.json [options]
+	flag.PrintDefaults()
+	fmt.Fprintf(os.Stderr, `
+Example config file:
+    Client.AuthToken must be recognized by Arvados as an admin token,
+    and must be recognized by all Keep services as a "data manager
+    key".
+    Client.Insecure should be true if your Arvados API endpoint uses
+    an unverifiable SSL/TLS certificate.
+Periodic scanning:
+    By default, keep-balance operates periodically, i.e.: do a
+    scan/balance operation, sleep, repeat.
+    RunPeriod determines the interval between start times of
+    successive scan/balance operations. If a scan/balance operation
+    takes longer than RunPeriod, the next one will follow it
+    immediately.
+    If SIGUSR1 is received during an idle period between operations,
+    the next operation will start immediately.
+One-time scanning:
+    Use the -once flag to do a single operation and then exit. The
+    exit code will be zero if the operation was successful.
+    By default, keep-service computes and reports changes but does not
+    implement them by sending pull and trash lists to the Keep
+    services.
+    Use the -commit-pull and -commit-trash flags to implement the
+    computed changes.
+    keep-balance does not attempt to discover whether committed pull
+    and trash requests ever get carried out -- only that they are
+    accepted by the Keep services. If some services are full, new
+    copies of underreplicated blocks might never get made, only
+    repeatedly requested.
+`, exampleConfigFile)
diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index 80d8670..d7da67c 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -4,6 +4,7 @@ import (
+	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
@@ -331,8 +332,12 @@ func main() {
 	// Initialize Pull queue and worker
+	arv, err := arvadosclient.MakeArvadosClient()
+	if err != nil {
+		log.Fatalf("MakeArvadosClient: %s", err)
+	}
 	keepClient := &keepclient.KeepClient{
-		Arvados:       nil,
+		Arvados:       &arv,
 		Want_replicas: 1,
 		Client:        &http.Client{},



More information about the arvados-commits mailing list