[ARVADOS] updated: ed7489197a4ec63c8f31214b87e7b96185fa15e5

Git user git at public.curoverse.com
Mon May 16 10:01:07 EDT 2016


Summary of changes:
 build/run-tests.sh                                 |   2 +
 .../keep-balance => sdk/go/x/arvados}/client.go    |   3 +-
 sdk/go/x/arvados/client_test.go                    |  83 +++++++++++++++
 .../go/x/arvados}/collection.go                    |   2 +-
 sdk/go/x/arvados/keep_block.go                     |  14 +++
 .../go/x/arvados/keep_service.go                   |  40 ++++---
 sdk/go/x/arvados/user.go                           |  14 +++
 services/keep-balance/balance.go                   | 117 +++++++++++++--------
 services/keep-balance/block_state.go               |  26 ++---
 services/keep-balance/change_set.go                |   8 +-
 services/keep-balance/example-config.json          |   6 +-
 services/keep-balance/keep_service.go              |   5 +-
 services/keep-balance/main.go                      |  26 +++--
 13 files changed, 247 insertions(+), 99 deletions(-)
 rename {services/keep-balance => sdk/go/x/arvados}/client.go (99%)
 create mode 100644 sdk/go/x/arvados/client_test.go
 rename {services/keep-balance => sdk/go/x/arvados}/collection.go (99%)
 create mode 100644 sdk/go/x/arvados/keep_block.go
 rename services/keep-balance/keep_service_model.go => sdk/go/x/arvados/keep_service.go (73%)
 create mode 100644 sdk/go/x/arvados/user.go

  discards  d86afd81c109b73abc4511c65a9584084374395d (commit)
       via  ed7489197a4ec63c8f31214b87e7b96185fa15e5 (commit)

This update added new revisions after undoing existing revisions.  That is
to say, the old revision is not a strict subset of the new revision.  This
situation occurs when you --force push a change and generate a repository
containing something like this:

 * -- * -- B -- O -- O -- O (d86afd81c109b73abc4511c65a9584084374395d)
            \
             N -- N -- N (ed7489197a4ec63c8f31214b87e7b96185fa15e5)

When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit ed7489197a4ec63c8f31214b87e7b96185fa15e5
Author: Tom Clegg <tom at curoverse.com>
Date:   Mon May 16 00:05:48 2016 -0400

    9162: Add keep-balance

diff --git a/build/run-tests.sh b/build/run-tests.sh
index d229341..fbbedf6 100755
--- a/build/run-tests.sh
+++ b/build/run-tests.sh
@@ -70,6 +70,7 @@ services/fuse
 services/keep-web
 services/keepproxy
 services/keepstore
+services/keep-balance
 services/login-sync
 services/nodemanager
 services/crunch-run
@@ -86,6 +87,7 @@ sdk/go/manifest
 sdk/go/blockdigest
 sdk/go/streamer
 sdk/go/crunchrunner
+sdk/go/x/arvados
 sdk/cwl
 tools/crunchstat-summary
 tools/keep-rsync
@@ -709,11 +711,13 @@ gostuff=(
     sdk/go/manifest
     sdk/go/streamer
     sdk/go/crunchrunner
+    sdk/go/x/arvados
     services/arv-git-httpd
     services/crunchstat
     services/keep-web
     services/keepstore
     sdk/go/keepclient
+    services/keep-balance
     services/keepproxy
     services/datamanager/summary
     services/datamanager/collection
diff --git a/sdk/go/x/arvados/client.go b/sdk/go/x/arvados/client.go
new file mode 100644
index 0000000..9af7392
--- /dev/null
+++ b/sdk/go/x/arvados/client.go
@@ -0,0 +1,132 @@
+package arvados
+
+import (
+	"encoding/json"
+	"fmt"
+	"io"
+	"io/ioutil"
+	"net/http"
+	"net/url"
+	"os"
+)
+
+type Client struct {
+	*http.Client
+	APIHost   string
+	AuthToken string
+	Insecure  bool
+}
+
+func NewClientFromEnv() *Client {
+	return &Client{
+		APIHost:   os.Getenv("ARVADOS_API_HOST"),
+		AuthToken: os.Getenv("ARVADOS_API_TOKEN"),
+		Insecure:  os.Getenv("ARVADOS_API_HOST_INSECURE") != "",
+	}
+}
+
+// Do adds authentication headers and then calls (*http.Client)Do().
+func (c *Client) Do(req *http.Request) (*http.Response, error) {
+	if c.AuthToken != "" {
+		req.Header.Add("Authorization", "OAuth2 "+c.AuthToken)
+	}
+	return c.httpClient().Do(req)
+}
+
+// DoAndDecode performs req and unmarshals the response (which must be
+// JSON) into dst. Use this instead of RequestAndDecode if you need
+// more control of the http.Request object.
+func (c *Client) DoAndDecode(dst interface{}, req *http.Request) error {
+	resp, err := c.Do(req)
+	if err != nil {
+		return err
+	}
+	defer resp.Body.Close()
+	buf, err := ioutil.ReadAll(resp.Body)
+	if err != nil {
+		return err
+	}
+	if resp.StatusCode != 200 {
+		return fmt.Errorf("request failed (%s): %s", req.URL, resp.Status)
+	}
+	return json.Unmarshal(buf, dst)
+}
+
+// RequestAndDecode performs an API request and unmarshals the
+// response (which must be JSON) into dst. Method and body arguments
+// are the same as for http.NewRequest(). The given path is added to
+// the server's scheme/host/port to form the request URL. The given
+// params are passed via POST form or query string.
+//
+// path must not contain a query string.
+func (c *Client) RequestAndDecode(dst interface{}, method, path string, body io.Reader, params interface{}) error {
+	urlString := c.apiURL(path)
+	var urlValues url.Values
+	if v, ok := params.(url.Values); ok {
+		urlValues = v
+	} else if params != nil {
+		// Convert an arbitrary struct to url.Values. For
+		// example, Foo{Bar: []int{1,2,3}, Baz: "waz"} becomes
+		// url.Values{`bar`:`{"a":[1,2,3]}`,`Baz`:`waz`}
+		//
+		// TODO: Do this more efficiently, possibly using
+		// json.Decode/Encode, so the whole thing doesn't have
+		// to get encoded, decoded, and re-encoded.
+		j, err := json.Marshal(params)
+		if err != nil {
+			return err
+		}
+		var generic map[string]interface{}
+		err = json.Unmarshal(j, &generic)
+		if err != nil {
+			return err
+		}
+		urlValues = url.Values{}
+		for k, v := range generic {
+			if v, ok := v.(string); ok {
+				urlValues.Set(k, v)
+				continue
+			}
+			j, err := json.Marshal(v)
+			if err != nil {
+				return err
+			}
+			urlValues.Set(k, string(j))
+		}
+	}
+	if (method == "GET" || body != nil) && urlValues != nil {
+		// FIXME: what if params don't fit in URL
+		u, err := url.Parse(urlString)
+		if err != nil {
+			return err
+		}
+		u.RawQuery = urlValues.Encode()
+		urlString = u.String()
+	}
+	req, err := http.NewRequest(method, urlString, body)
+	if err != nil {
+		return err
+	}
+	return c.DoAndDecode(dst, req)
+}
+
+func (c *Client) httpClient() *http.Client {
+	if c.Client == nil {
+		return http.DefaultClient
+	}
+	return c.Client
+}
+
+func (c *Client) apiURL(path string) string {
+	return "https://" + c.APIHost + "/" + path
+}
+
+type DiscoveryDocument struct {
+	DefaultCollectionReplication int   `json:"defaultCollectionReplication"`
+	BlobSignatureTTL             int64 `json:"blobSignatureTtl"`
+}
+
+func (c *Client) DiscoveryDocument() (*DiscoveryDocument, error) {
+	var dd DiscoveryDocument
+	return &dd, c.RequestAndDecode(&dd, "GET", "discovery/v1/apis/arvados/v1/rest", nil, nil)
+}
diff --git a/sdk/go/x/arvados/client_test.go b/sdk/go/x/arvados/client_test.go
new file mode 100644
index 0000000..2db50bf
--- /dev/null
+++ b/sdk/go/x/arvados/client_test.go
@@ -0,0 +1,83 @@
+package arvados
+
+import (
+	"bytes"
+	"fmt"
+	"io/ioutil"
+	"net/http"
+	"sync"
+	"testing"
+)
+
+type stubTransport struct {
+	Responses map[string]string
+	Requests  []http.Request
+	sync.Mutex
+}
+
+func (stub *stubTransport) RoundTrip(req *http.Request) (*http.Response, error) {
+	stub.Lock()
+	stub.Requests = append(stub.Requests, *req)
+	stub.Unlock()
+
+	resp := &http.Response{
+		Status:     "200 OK",
+		StatusCode: 200,
+		Proto:      "HTTP/1.1",
+		ProtoMajor: 1,
+		ProtoMinor: 1,
+		Request:    req,
+	}
+	str := stub.Responses[req.URL.Path]
+	if str == "" {
+		resp.Status = "404 Not Found"
+		resp.StatusCode = 404
+		str = "{}"
+	}
+	buf := bytes.NewBufferString(str)
+	resp.Body = ioutil.NopCloser(buf)
+	resp.ContentLength = int64(buf.Len())
+	return resp, nil
+}
+
+type errorTransport struct{}
+
+func (stub *errorTransport) RoundTrip(req *http.Request) (*http.Response, error) {
+	return nil, fmt.Errorf("something awful happened")
+}
+
+func TestCurrentUser(t *testing.T) {
+	t.Parallel()
+	stub := &stubTransport{
+		Responses: map[string]string{
+			"/arvados/v1/users/current": `{"uuid":"zzzzz-abcde-012340123401234"}`,
+		},
+	}
+	c := &Client{
+		Client: &http.Client{
+			Transport: stub,
+		},
+		APIHost:   "zzzzz.arvadosapi.com",
+		AuthToken: "xyzzy",
+	}
+	u, err := c.CurrentUser()
+	if err != nil {
+		t.Fatal(err)
+	}
+	if x := "zzzzz-abcde-012340123401234"; u.UUID != x {
+		t.Errorf("got uuid %q, expected %q", u.UUID, x)
+	}
+	if len(stub.Requests) < 1 {
+		t.Fatal("empty stub.Requests")
+	}
+	hdr := stub.Requests[len(stub.Requests)-1].Header
+	if hdr.Get("Authorization") != "OAuth2 xyzzy" {
+		t.Errorf("got headers %+q, expected Authorization header", hdr)
+	}
+
+	c.Client.Transport = &errorTransport{}
+	u, err = c.CurrentUser()
+	if err == nil {
+		t.Errorf("got nil error, expected something awful")
+	}
+}
diff --git a/sdk/go/x/arvados/collection.go b/sdk/go/x/arvados/collection.go
new file mode 100644
index 0000000..602b02c
--- /dev/null
+++ b/sdk/go/x/arvados/collection.go
@@ -0,0 +1,155 @@
+package arvados
+
+import (
+	"bufio"
+	"encoding/json"
+	"fmt"
+	"strings"
+	"time"
+
+	"git.curoverse.com/arvados.git/sdk/go/manifest"
+)
+
+type Collection struct {
+	UUID                   string     `json:"uuid,omitempty"`
+	ExpiresAt              *time.Time `json:"expires_at,omitempty"`
+	ManifestText           string     `json:"manifest_text,omitempty"`
+	CreatedAt              *time.Time `json:"created_at,omitempty"`
+	ModifiedAt             *time.Time `json:"modified_at,omitempty"`
+	PortableDataHash       string     `json:"portable_data_hash,omitempty"`
+	ReplicationConfirmed   *int       `json:"replication_confirmed,omitempty"`
+	ReplicationConfirmedAt *time.Time `json:"replication_confirmed_at,omitempty"`
+	ReplicationDesired     *int       `json:"replication_desired,omitempty"`
+}
+
+func (c *Collection) SizedDigests() ([]SizedDigest, error) {
+	if c.ManifestText == "" && c.PortableDataHash != "d41d8cd98f00b204e9800998ecf8427e+0" {
+		// TODO: Check more subtle forms of corruption, too
+		return nil, fmt.Errorf("manifest is missing")
+	}
+	var sds []SizedDigest
+	scanner := bufio.NewScanner(strings.NewReader(c.ManifestText))
+	scanner.Buffer(make([]byte, 1048576), len(c.ManifestText))
+	for scanner.Scan() {
+		line := scanner.Text()
+		tokens := strings.Split(line, " ")
+		if len(tokens) < 3 {
+			return nil, fmt.Errorf("Invalid stream (<3 tokens): %q", line)
+		}
+		for _, token := range tokens[1:] {
+			if !manifest.LocatorPattern.MatchString(token) {
+				// FIXME: ensure it's a file token
+				break
+			}
+			// FIXME: shouldn't assume 32 char hash
+			if i := strings.IndexRune(token[33:], '+'); i >= 0 {
+				token = token[:33+i]
+			}
+			sds = append(sds, SizedDigest(token))
+		}
+	}
+	return sds, scanner.Err()
+}
+
+type CollectionList struct {
+	Items          []Collection `json:"items"`
+	ItemsAvailable int          `json:"items_available"`
+	Offset         int          `json:"offset"`
+	Limit          int          `json:"limit"`
+}
+
+type Filter struct {
+	Attr     string
+	Operator string
+	Operand  interface{}
+}
+
+func (f *Filter) MarshalJSON() ([]byte, error) {
+	return json.Marshal([]interface{}{f.Attr, f.Operator, f.Operand})
+}
+
+type ResourceListParams struct {
+	Select  []string `json:"select,omitempty"`
+	Filters []Filter `json:"filters,omitempty"`
+	Limit   *int     `json:"limit,omitempty"`
+	Offset  int      `json:"offset,omitempty"`
+	Order   string   `json:"order,omitempty"`
+}
+
+// EachCollection calls f once for every readable
+// collection. EachCollection stops if it encounters an error, such as
+// f returning a non-nil error.
+//
+// The progress function is called periodically with done (number of
+// times f has been called) and total (number of times f is expected
+// to be called).
+func (c *Client) EachCollection(f func(Collection) error, progress func(done, total int)) error {
+	if progress == nil {
+		progress = func(_, _ int) {}
+	}
+
+	var expectCount int
+	{
+		var page CollectionList
+		var zero int
+		err := c.RequestAndDecode(&page, "GET", "arvados/v1/collections", nil, ResourceListParams{Limit: &zero})
+		if err != nil {
+			return err
+		}
+		expectCount = page.ItemsAvailable
+	}
+
+	limit := 1000
+	params := ResourceListParams{
+		Limit:  &limit,
+		Order:  "modified_at, uuid",
+		Select: []string{"uuid", "manifest_text", "modified_at", "portable_data_hash"},
+	}
+	var last Collection
+	var filterTime time.Time
+	callCount := 0
+	for {
+		progress(callCount, expectCount)
+		var page CollectionList
+		err := c.RequestAndDecode(&page, "GET", "arvados/v1/collections", nil, params)
+		if err != nil {
+			return err
+		}
+		for _, coll := range page.Items {
+			if last.ModifiedAt != nil && *last.ModifiedAt == *coll.ModifiedAt && last.UUID >= coll.UUID {
+				continue
+			}
+			callCount++
+			err = f(coll)
+			if err != nil {
+				return err
+			}
+			last = coll
+		}
+		if last.ModifiedAt != nil && *last.ModifiedAt == filterTime {
+			if page.ItemsAvailable > len(page.Items) {
+				// TODO: use "mtime=X && UUID>Y"
+				// filters to get all collections with
+				// this timestamp, then use "mtime>X"
+				// to get the next timestamp.
+				return fmt.Errorf("BUG: Received an entire page with the same modified_at timestamp (%v), cannot make progress", filterTime)
+			}
+			break
+		}
+		filterTime = *last.ModifiedAt
+		params.Filters = []Filter{{
+			Attr:     "modified_at",
+			Operator: ">=",
+			Operand:  filterTime,
+		}, {
+			Attr:     "uuid",
+			Operator: "!=",
+			Operand:  last.UUID,
+		}}
+	}
+	progress(callCount, expectCount)
+	if callCount < expectCount {
+		return fmt.Errorf("Retrieved only %d collections, expected %d", callCount, expectCount)
+	}
+	return nil
+}
diff --git a/sdk/go/x/arvados/keep_block.go b/sdk/go/x/arvados/keep_block.go
new file mode 100644
index 0000000..68ef1d5
--- /dev/null
+++ b/sdk/go/x/arvados/keep_block.go
@@ -0,0 +1,14 @@
+package arvados
+
+import (
+	"strconv"
+	"strings"
+)
+
+// SizedDigest is hash+size
+type SizedDigest string
+
+func (sd SizedDigest) Size() int64 {
+	n, _ := strconv.ParseInt(strings.Split(string(sd), "+")[1], 10, 64)
+	return n
+}
diff --git a/sdk/go/x/arvados/keep_service.go b/sdk/go/x/arvados/keep_service.go
new file mode 100644
index 0000000..ff4a699
--- /dev/null
+++ b/sdk/go/x/arvados/keep_service.go
@@ -0,0 +1,121 @@
+package arvados
+
+import (
+	"bufio"
+	"fmt"
+	"net/http"
+	"strconv"
+	"strings"
+)
+
+type KeepService struct {
+	UUID           string `json:"uuid"`
+	ServiceHost    string `json:"service_host"`
+	ServicePort    int    `json:"service_port"`
+	ServiceSSLFlag bool   `json:"service_ssl_flag"`
+	ServiceType    string `json:"service_type"`
+	ReadOnly       bool   `json:"read_only"`
+}
+
+type KeepServiceList struct {
+	Items          []KeepService `json:"items"`
+	ItemsAvailable int           `json:"items_available"`
+	Offset         int           `json:"offset"`
+	Limit          int           `json:"limit"`
+}
+
+// IndexEntry is what keepstore's index response tells us about a
+// stored block.
+type KeepServiceIndexEntry struct {
+	SizedDigest
+	Mtime int64
+}
+
+// EachKeepService calls f once for every readable
+// KeepService. EachKeepService stops if it encounters an
+// error, such as f returning a non-nil error.
+func (c *Client) EachKeepService(f func(KeepService) error) error {
+	params := ResourceListParams{}
+	for {
+		var page KeepServiceList
+		err := c.RequestAndDecode(&page, "GET", "arvados/v1/keep_services", nil, params)
+		if err != nil {
+			return err
+		}
+		for _, item := range page.Items {
+			err = f(item)
+			if err != nil {
+				return err
+			}
+		}
+		params.Offset = params.Offset + len(page.Items)
+		if params.Offset >= page.ItemsAvailable {
+			return nil
+		}
+	}
+}
+
+func (s *KeepService) url(path string) string {
+	var f string
+	if s.ServiceSSLFlag {
+		f = "https://%s:%d/%s"
+	} else {
+		f = "http://%s:%d/%s"
+	}
+	return fmt.Sprintf(f, s.ServiceHost, s.ServicePort, path)
+}
+
+// String implements fmt.Stringer
+func (s *KeepService) String() string {
+	return s.UUID
+}
+
+// Index returns an unsorted list of blocks that can be retrieved from
+// this server.
+func (s *KeepService) Index(c *Client, prefix string) ([]KeepServiceIndexEntry, error) {
+	url := s.url("index/" + prefix)
+	req, err := http.NewRequest("GET", url, nil)
+	if err != nil {
+		return nil, fmt.Errorf("NewRequest(%v): %v", url, err)
+	}
+	resp, err := c.Do(req)
+	if err != nil {
+		return nil, fmt.Errorf("Do(%v): %v", url, err)
+	} else if resp.StatusCode != 200 {
+		return nil, fmt.Errorf("%v: %v", url, resp.Status)
+	}
+	defer resp.Body.Close()
+
+	var entries []KeepServiceIndexEntry
+	scanner := bufio.NewScanner(resp.Body)
+	sawEOF := false
+	for scanner.Scan() {
+		if sawEOF {
+			return nil, fmt.Errorf("Index response contained non-terminal blank line")
+		}
+		line := scanner.Text()
+		if line == "" {
+			sawEOF = true
+			continue
+		}
+		fields := strings.Split(line, " ")
+		if len(fields) != 2 {
+			return nil, fmt.Errorf("Malformed index line %q: %d fields", line, len(fields))
+		}
+		mtime, err := strconv.ParseInt(fields[1], 10, 64)
+		if err != nil {
+			return nil, fmt.Errorf("Malformed index line %q: mtime: %v", line, err)
+		}
+		entries = append(entries, KeepServiceIndexEntry{
+			SizedDigest: SizedDigest(fields[0]),
+			Mtime:       mtime,
+		})
+	}
+	if err := scanner.Err(); err != nil {
+		return nil, fmt.Errorf("Error scanning index response: %v", err)
+	}
+	if !sawEOF {
+		return nil, fmt.Errorf("Index response had no EOF marker")
+	}
+	return entries, nil
+}
diff --git a/sdk/go/x/arvados/user.go b/sdk/go/x/arvados/user.go
new file mode 100644
index 0000000..cd506ee
--- /dev/null
+++ b/sdk/go/x/arvados/user.go
@@ -0,0 +1,14 @@
+package arvados
+
+type User struct {
+	UUID     string `json:"uuid,omitempty"`
+	IsActive bool   `json:"is_active"`
+	IsAdmin  bool   `json:"is_admin"`
+	Username string `json:"username,omitempty"`
+}
+
+func (c *Client) CurrentUser() (User, error) {
+	var u User
+	err := c.RequestAndDecode(&u, "GET", "arvados/v1/users/current", nil, nil)
+	return u, err
+}
diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go
new file mode 100644
index 0000000..1b42b77
--- /dev/null
+++ b/services/keep-balance/balance.go
@@ -0,0 +1,359 @@
+package main
+
+import (
+	"fmt"
+	"log"
+	"runtime"
+	"sync"
+	"time"
+
+	"git.curoverse.com/arvados.git/sdk/go/keepclient"
+	"git.curoverse.com/arvados.git/sdk/go/x/arvados"
+)
+
+type Balancer struct {
+	*BlockStateMap
+	KeepServices       map[string]*KeepService
+	DefaultReplication int
+	ServiceTypes       []string
+	Logger             *log.Logger
+	MinMtime           int64
+
+	collScanned  int
+	serviceRoots map[string]string
+	errors       []error
+	mutex        sync.Mutex
+}
+
+func (bal *Balancer) addCollection(coll arvados.Collection) error {
+	blkids, err := coll.SizedDigests()
+	if err != nil {
+		bal.mutex.Lock()
+		bal.errors = append(bal.errors, fmt.Errorf("%v: %v", coll.UUID, err))
+		bal.mutex.Unlock()
+		return nil
+	}
+	repl := bal.DefaultReplication
+	if coll.ReplicationDesired != nil {
+		repl = *coll.ReplicationDesired
+	}
+	debugf("%v: %d block x%d", coll.UUID, len(blkids), repl)
+	bal.BlockStateMap.IncreaseDesired(repl, blkids)
+	return nil
+}
+
+func (bal *Balancer) addKeepService(srv arvados.KeepService) error {
+	for _, t := range bal.ServiceTypes {
+		if t == srv.ServiceType {
+			bal.KeepServices[srv.UUID] = &KeepService{KeepService: srv}
+			return nil
+		}
+	}
+	bal.Logf("skipping %v with service type %q", srv.UUID, srv.ServiceType)
+	return nil
+}
+
+func (bal *Balancer) Logf(f string, args ...interface{}) {
+	if bal.Logger != nil {
+		bal.Logger.Printf(f, args...)
+	}
+}
+
+func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
+	u, err := c.CurrentUser()
+	if err != nil {
+		return fmt.Errorf("CurrentUser(): %v", err)
+	}
+	if !u.IsActive || !u.IsAdmin {
+		return fmt.Errorf("Current user (%s) is not an active admin user", u.UUID)
+	}
+	return nil
+}
+
+func (bal *Balancer) CheckSanityLate() error {
+	if bal.errors != nil {
+		for _, err := range bal.errors {
+			log.Println("deferred error:", err)
+		}
+		return fmt.Errorf("cannot proceed safely after deferred errors")
+	}
+
+	if bal.collScanned == 0 {
+		return fmt.Errorf("received zero collections")
+	}
+
+	anyDesired := false
+	bal.BlockStateMap.Apply(func(_ arvados.SizedDigest, blk *BlockState) {
+		if blk.Desired > 0 {
+			anyDesired = true
+		}
+	})
+	if !anyDesired {
+		return fmt.Errorf("zero blocks have desired replication>0")
+	}
+
+	// TODO: no two services have identical indexes
+	// TODO: no collisions (same md5, different size)
+	return nil
+}
+
+// GetCurrentState determines the current replication state, and the
+// desired replication level, for every block that is either
+// retrievable or referenced.
+//
+// It determines the current replication state by reading the block index
+// from every known Keep service.
+//
+// It determines the desired replication level by retrieving all
+// collection manifests in the database (API server).
+//
+// It encodes the resulting information in BlockStateMap.
+func (bal *Balancer) GetCurrentState(c *arvados.Client) error {
+	defer timeMe()("GetCurrentState")
+	bal.BlockStateMap = NewBlockStateMap()
+	bal.KeepServices = make(map[string]*KeepService)
+
+	dd, err := c.DiscoveryDocument()
+	if err != nil {
+		return err
+	}
+	bal.DefaultReplication = dd.DefaultCollectionReplication
+	bal.MinMtime = time.Now().Unix() - dd.BlobSignatureTTL
+
+	errs := make(chan error)
+	wg := sync.WaitGroup{}
+
+	if err := c.EachKeepService(bal.addKeepService); err != nil {
+		return err
+	}
+	for _, srv := range bal.KeepServices {
+		wg.Add(1)
+		go func(srv *KeepService) {
+			defer wg.Done()
+			bal.Logf("%s: retrieve index", srv)
+			idx, err := srv.Index(c, "")
+			if err != nil {
+				errs <- fmt.Errorf("%s: %v", srv, err)
+				return
+			}
+			bal.Logf("%s: add replicas to map", srv)
+			bal.BlockStateMap.AddReplicas(srv, idx)
+			bal.Logf("%s: done", srv)
+		}(srv)
+	}
+
+	collQ := make(chan arvados.Collection, 1000)
+
+	wg.Add(1)
+	go func() {
+		defer wg.Done()
+		for coll := range collQ {
+			err := bal.addCollection(coll)
+			if err != nil {
+				errs <- err
+				for _ = range collQ {
+				}
+				return
+			}
+			bal.collScanned++
+		}
+	}()
+
+	wg.Add(1)
+	go func() {
+		defer wg.Done()
+		err = c.EachCollection(
+			func(coll arvados.Collection) error {
+				collQ <- coll
+				if len(errs) > 0 {
+					// some other GetCurrentState
+					// error happened: no point
+					// getting any more
+					// collections.
+					return fmt.Errorf("")
+				}
+				return nil
+			}, func(done, total int) {
+				bal.Logf("collections: %d/%d", done, total)
+			})
+		close(collQ)
+		if err != nil {
+			errs <- err
+		}
+	}()
+
+	go func() {
+		wg.Wait()
+		close(errs)
+	}()
+	return <-errs
+}
+
+// ComputeChangeSets compares, for each known block, the current and
+// desired replication states. If it is possible to get closer to the
+// desired state by copying or deleting blocks, it submits those
+// changes to the relevant KeepServices' ChangeSets.
+func (bal *Balancer) ComputeChangeSets() {
+	defer timeMe()("ComputeChangeSets")
+	bal.serviceRoots = make(map[string]string)
+	for _, srv := range bal.KeepServices {
+		bal.serviceRoots[srv.UUID] = srv.UUID
+	}
+
+	type balanceTask struct {
+		blkid arvados.SizedDigest
+		blk   *BlockState
+	}
+	todo := make(chan balanceTask, 16)
+	var wg sync.WaitGroup
+	for i := 0; i < 1+runtime.NumCPU(); i++ {
+		wg.Add(1)
+		go func() {
+			for work := range todo {
+				bal.balanceBlock(work.blkid, work.blk)
+			}
+			wg.Done()
+		}()
+	}
+	bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
+		todo <- balanceTask{
+			blkid: blkid,
+			blk:   blk,
+		}
+	})
+	close(todo)
+	wg.Wait()
+}
+
+// balanceBlock compares current state to desired state for a single
+// block, and makes the appropriate ChangeSet calls.
+func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) {
+	debugf("balanceBlock: %v %+v", blkid, blk)
+	uuids := keepclient.NewRootSorter(bal.serviceRoots, string(blkid[:32])).GetSortedRoots()
+	hasRepl := make(map[string]Replica, len(bal.serviceRoots))
+	for _, repl := range blk.Replicas {
+		hasRepl[repl.UUID] = repl
+	}
+	// To be safe we assume two replicas with the same Mtime are
+	// in fact the same replica being reported more than
+	// once. len(uniqueBestRepl) is the number of distinct
+	// replicas in the best rendezvous positions we've considered
+	// so far.
+	uniqueBestRepl := make(map[int64]bool, len(bal.serviceRoots))
+	// pulls is the number of Pull changes we have already
+	// requested. (For purposes of deciding whether to Pull to
+	// rendezvous position N, we should assume all pulls we have
+	// requested on rendezvous positions M<N will be successful.)
+	pulls := 0
+	for _, uuid := range uuids {
+		srv := bal.KeepServices[uuid]
+		// TODO: request a Touch if Mtime is duplicated.
+		if repl, ok := hasRepl[srv.UUID]; ok {
+			// This service has a replica. We should
+			// delete it if [1] we already have enough
+			// distinct replicas in better rendezvous
+			// positions and [2] this replica's Mtime is
+			// distinct from the better replicas' Mtime.
+			if len(uniqueBestRepl) >= blk.Desired &&
+				!uniqueBestRepl[repl.Mtime] &&
+				repl.Mtime < bal.MinMtime {
+				srv.AddTrash(Trash{
+					SizedDigest: blkid,
+					Mtime:       repl.Mtime,
+				})
+			}
+			uniqueBestRepl[repl.Mtime] = true
+		} else if pulls+len(uniqueBestRepl) < blk.Desired &&
+			len(blk.Replicas) > 0 &&
+			!srv.ReadOnly {
+			// This service doesn't have a replica. We
+			// should pull one to this server if we don't
+			// already have enough (existing+requested)
+			// replicas in better rendezvous positions.
+			srv.AddPull(Pull{
+				SizedDigest: blkid,
+				Source:      blk.Replicas[0].KeepService,
+			})
+			pulls++
+		}
+	}
+}
+
+type blocksNBytes struct {
+	replicas int
+	blocks   int
+	bytes    int64
+}
+
+func (bb blocksNBytes) String() string {
+	return fmt.Sprintf("%d replicas (%d blocks, %d bytes)", bb.replicas, bb.blocks, bb.bytes)
+}
+
+func (bal *Balancer) PrintStatistics() {
+	var lost, overrep, unref, garbage, underrep, justright, desired, current blocksNBytes
+	bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
+		surplus := len(blk.Replicas) - blk.Desired
+		bytes := blkid.Size()
+		switch {
+		case len(blk.Replicas) == 0 && blk.Desired > 0:
+			lost.replicas -= surplus
+			lost.blocks++
+			lost.bytes += bytes * int64(-surplus)
+		case len(blk.Replicas) < blk.Desired:
+			underrep.replicas -= surplus
+			underrep.blocks++
+			underrep.bytes += bytes * int64(-surplus)
+		case len(blk.Replicas) > 0 && blk.Desired == 0:
+			counter := &garbage
+			for _, r := range blk.Replicas {
+				if r.Mtime >= bal.MinMtime {
+					counter = &unref
+					break
+				}
+			}
+			counter.replicas += surplus
+			counter.blocks++
+			counter.bytes += bytes * int64(surplus)
+		case len(blk.Replicas) > blk.Desired:
+			overrep.replicas += surplus
+			overrep.blocks++
+			overrep.bytes += bytes * int64(len(blk.Replicas)-blk.Desired)
+		default:
+			justright.replicas += blk.Desired
+			justright.blocks++
+			justright.bytes += bytes * int64(blk.Desired)
+		}
+
+		if blk.Desired > 0 {
+			desired.replicas += blk.Desired
+			desired.blocks++
+			desired.bytes += bytes * int64(blk.Desired)
+		}
+		current.replicas += len(blk.Replicas)
+		current.blocks++
+		current.bytes += bytes * int64(len(blk.Replicas))
+	})
+	fmt.Println("===")
+	fmt.Println(lost, "lost (0=have<want)")
+	fmt.Println(underrep, "underreplicated (0<have<want)")
+	fmt.Println(justright, "just right (have=want)")
+	fmt.Println(overrep, "overreplicated (have>want>0)")
+	fmt.Println(unref, "unreferenced (have>want=0, new)")
+	fmt.Println(garbage, "garbage (have>want=0, old)")
+	fmt.Println("===")
+	fmt.Println(desired, "total commitment (excluding unreferenced)")
+	fmt.Println(current, "total usage")
+	fmt.Println("===")
+	for _, srv := range bal.KeepServices {
+		fmt.Printf("%s: %v\n", srv, srv.ChangeSet)
+	}
+	fmt.Println("===")
+}
+
+func (bal *Balancer) ApplyChangeSets(c *arvados.Client) error {
+	defer timeMe()("ApplyChangeSets")
+	if err := bal.CheckSanityLate(); err != nil {
+		return err
+	}
+	return fmt.Errorf("unimplemented (FIXME)")
+}
diff --git a/services/keep-balance/block_state.go b/services/keep-balance/block_state.go
new file mode 100644
index 0000000..f8edec5
--- /dev/null
+++ b/services/keep-balance/block_state.go
@@ -0,0 +1,87 @@
+package main
+
+import (
+	"sync"
+
+	"git.curoverse.com/arvados.git/sdk/go/x/arvados"
+)
+
+type Replica struct {
+	*KeepService
+	Mtime int64
+}
+
+type BlockState struct {
+	Replicas []Replica
+	Desired  int
+}
+
+func (bs *BlockState) AddReplica(r Replica) {
+	bs.Replicas = append(bs.Replicas, r)
+}
+
+func (bs *BlockState) IncreaseDesired(n int) {
+	if bs.Desired < n {
+		bs.Desired = n
+	}
+}
+
+// BlockStateMap is a goroutine-safe wrapper around a
+// map[arvados.SizedDigest]*BlockState.
+type BlockStateMap struct {
+	entries map[arvados.SizedDigest]*BlockState
+	mutex  sync.Mutex
+}
+
+func NewBlockStateMap() *BlockStateMap {
+	return &BlockStateMap{
+		entries: make(map[arvados.SizedDigest]*BlockState),
+	}
+}
+
+// Get returns a BlockState entry, allocating a new one if needed.
+func (bsm *BlockStateMap) Get(blkid arvados.SizedDigest) *BlockState {
+	// TODO? Allocate BlockState structs a slice at a time,
+	// instead of one at a time.
+	blk := bsm.entries[blkid]
+	if blk == nil {
+		blk = &BlockState{}
+		bsm.entries[blkid] = blk
+	}
+	return blk
+}
+
+// Apply runs f on each entry in the map.
+func (bsm *BlockStateMap) Apply(f func(arvados.SizedDigest, *BlockState)) {
+	bsm.mutex.Lock()
+	defer bsm.mutex.Unlock()
+
+	for blkid, blk := range bsm.entries {
+		f(blkid, blk)
+	}
+}
+
+// AddReplicas updates the map to indicate srv has a replica of each
+// block in idx.
+func (bsm *BlockStateMap) AddReplicas(srv *KeepService, idx []arvados.KeepServiceIndexEntry) {
+	bsm.mutex.Lock()
+	defer bsm.mutex.Unlock()
+
+	for _, ent := range idx {
+		bsm.Get(ent.SizedDigest).AddReplica(Replica{
+			KeepService: srv,
+			Mtime:       ent.Mtime,
+		})
+	}
+}
+
+// IncreaseDesired updates the map to indicate the desired replication
+// for the given blocks is at least n.
+func (bsm *BlockStateMap) IncreaseDesired(n int, blocks []arvados.SizedDigest) {
+	bsm.mutex.Lock()
+	defer bsm.mutex.Unlock()
+
+	for _, blkid := range blocks {
+		bsm.Get(blkid).IncreaseDesired(n)
+	}
+}
diff --git a/services/keep-balance/change_set.go b/services/keep-balance/change_set.go
new file mode 100644
index 0000000..6b79b18
--- /dev/null
+++ b/services/keep-balance/change_set.go
@@ -0,0 +1,50 @@
+package main
+
+import (
+	"fmt"
+	"sync"
+
+	"git.curoverse.com/arvados.git/sdk/go/x/arvados"
+)
+
+// Pull is a request to retrieve a block from a remote server, and
+// store it locally.
+type Pull struct {
+	arvados.SizedDigest
+	Source *KeepService
+}
+
+// Trash is a request to delete a block.
+type Trash struct {
+	arvados.SizedDigest
+	Mtime int64
+}
+
+// ChangeSet is a set of change requests that will be sent to a
+// keepstore server.
+type ChangeSet struct {
+	Pulls   []Pull
+	Trashes []Trash
+	mutex   sync.Mutex
+}
+
+// AddPull adds a Pull operation.
+func (cs *ChangeSet) AddPull(p Pull) {
+	cs.mutex.Lock()
+	cs.Pulls = append(cs.Pulls, p)
+	cs.mutex.Unlock()
+}
+
+// AddTrash adds a Trash operation
+func (cs *ChangeSet) AddTrash(t Trash) {
+	cs.mutex.Lock()
+	cs.Trashes = append(cs.Trashes, t)
+	cs.mutex.Unlock()
+}
+
+// String implements fmt.Stringer.
+func (cs ChangeSet) String() string {
+	cs.mutex.Lock()
+	defer cs.mutex.Unlock()
+	return fmt.Sprintf("ChangeSet{Pulls:%d, Trashes:%d}", len(cs.Pulls), len(cs.Trashes))
+}
diff --git a/services/keep-balance/example-config.json b/services/keep-balance/example-config.json
new file mode 100644
index 0000000..a6a6e58
--- /dev/null
+++ b/services/keep-balance/example-config.json
@@ -0,0 +1,11 @@
+{
+    "Client": {
+        "APIHost": "4xphq.arvadosapi.com:443",
+        "AuthToken": "zzzzzz",
+        "Insecure": false
+    },
+    "Apply": false,
+    "ServiceTypes": [
+        "disk"
+    ]
+}
diff --git a/services/keep-balance/keep_service.go b/services/keep-balance/keep_service.go
new file mode 100644
index 0000000..f4db28e
--- /dev/null
+++ b/services/keep-balance/keep_service.go
@@ -0,0 +1,15 @@
+package main
+
+import (
+	"fmt"
+	"git.curoverse.com/arvados.git/sdk/go/x/arvados"
+)
+
+type KeepService struct {
+	arvados.KeepService
+	ChangeSet
+}
+
+func (srv KeepService) String() string {
+	return fmt.Sprintf("%s (%s:%d, %s)", srv.UUID, srv.ServiceHost, srv.ServicePort, srv.ServiceType)
+}
diff --git a/services/keep-balance/main.go b/services/keep-balance/main.go
new file mode 100644
index 0000000..25ed818
--- /dev/null
+++ b/services/keep-balance/main.go
@@ -0,0 +1,71 @@
+package main
+
+import (
+	"encoding/json"
+	"flag"
+	"io/ioutil"
+	"log"
+	"os"
+
+	"git.curoverse.com/arvados.git/sdk/go/x/arvados"
+)
+
+type Config struct {
+	Client       arvados.Client
+	Apply        bool
+	ServiceTypes []string
+}
+
+var debugf func(string, ...interface{}) = func(string, ...interface{}) {}
+
+func main() {
+	var config Config
+
+	configPath := flag.String("config",
+		os.Getenv("HOME")+"/.config/arvados/keep-balance.json",
+		"configuration file")
+	debugFlag := flag.Bool("debug", false, "enable debug messages")
+	flag.Parse()
+
+	if buf, err := ioutil.ReadFile(*configPath); err != nil {
+		log.Fatalf("Reading %q: %v", *configPath, err)
+	} else if err = json.Unmarshal(buf, &config); err != nil {
+		log.Fatalf("Decoding %q: %v", *configPath, err)
+	}
+
+	if *debugFlag {
+		debugf = log.Printf
+		if j, err := json.Marshal(config); err != nil {
+			log.Fatal(err)
+		} else {
+			log.Printf("config is %s", j)
+		}
+	}
+	if err := Run(config); err != nil {
+		log.Fatal(err)
+	}
+}
+
+func Run(config Config) error {
+	bal := Balancer{
+		Logger:       log.New(os.Stderr, "", log.LstdFlags),
+		ServiceTypes: config.ServiceTypes,
+	}
+
+	var err error
+	if err = bal.CheckSanityEarly(&config.Client); err != nil {
+		return err
+	}
+	if err = bal.GetCurrentState(&config.Client); err != nil {
+		return err
+	}
+	bal.ComputeChangeSets()
+	bal.PrintStatistics()
+	if err = bal.CheckSanityLate(); err != nil {
+		return err
+	}
+	if config.Apply {
+		err = bal.ApplyChangeSets(&config.Client)
+	}
+	return err
+}
diff --git a/services/keep-balance/time_me.go b/services/keep-balance/time_me.go
new file mode 100644
index 0000000..549ccd7
--- /dev/null
+++ b/services/keep-balance/time_me.go
@@ -0,0 +1,13 @@
+package main
+
+import (
+	"fmt"
+	"time"
+)
+
+func timeMe() func(string) {
+	t0 := time.Now()
+	return func(label string) {
+		fmt.Printf("%s: %v\n", label, time.Since(t0))
+	}
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list