[ARVADOS] updated: e4e73f9d2b2b2392eda9b343803d42fc88cebca9
Git user
git at public.curoverse.com
Mon May 16 10:17:31 EDT 2016
Summary of changes:
sdk/go/x/arvados/client.go | 7 +++++++
1 file changed, 7 insertions(+)
discards ed7489197a4ec63c8f31214b87e7b96185fa15e5 (commit)
via e4e73f9d2b2b2392eda9b343803d42fc88cebca9 (commit)
This update added new revisions after undoing existing revisions. That is
to say, the old revision is not a strict subset of the new revision. This
situation occurs when you --force push a change and generate a repository
containing something like this:
* -- * -- B -- O -- O -- O (ed7489197a4ec63c8f31214b87e7b96185fa15e5)
\
N -- N -- N (e4e73f9d2b2b2392eda9b343803d42fc88cebca9)
When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit e4e73f9d2b2b2392eda9b343803d42fc88cebca9
Author: Tom Clegg <tom at curoverse.com>
Date: Mon May 16 10:17:26 2016 -0400
9162: Add keep-balance
diff --git a/build/run-tests.sh b/build/run-tests.sh
index d229341..fbbedf6 100755
--- a/build/run-tests.sh
+++ b/build/run-tests.sh
@@ -70,6 +70,7 @@ services/fuse
services/keep-web
services/keepproxy
services/keepstore
+services/keep-balance
services/login-sync
services/nodemanager
services/crunch-run
@@ -86,6 +87,7 @@ sdk/go/manifest
sdk/go/blockdigest
sdk/go/streamer
sdk/go/crunchrunner
+sdk/go/x/arvados
sdk/cwl
tools/crunchstat-summary
tools/keep-rsync
@@ -709,11 +711,13 @@ gostuff=(
sdk/go/manifest
sdk/go/streamer
sdk/go/crunchrunner
+ sdk/go/x/arvados
services/arv-git-httpd
services/crunchstat
services/keep-web
services/keepstore
sdk/go/keepclient
+ services/keep-balance
services/keepproxy
services/datamanager/summary
services/datamanager/collection
diff --git a/sdk/go/x/arvados/client.go b/sdk/go/x/arvados/client.go
new file mode 100644
index 0000000..f3f1286
--- /dev/null
+++ b/sdk/go/x/arvados/client.go
@@ -0,0 +1,139 @@
+package arvados
+
+import (
+ "encoding/json"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "net/http"
+ "net/url"
+ "os"
+)
+
+type Client struct {
+ *http.Client
+ APIHost string
+ AuthToken string
+ Insecure bool
+}
+
+// NewClientFromEnv creates a new Client that uses the service
+// endpoint and credentials given by the ARVADOS_API_* environment
+// variables.
+func NewClientFromEnv() *Client {
+ return &Client{
+ APIHost: os.Getenv("ARVADOS_API_HOST"),
+ AuthToken: os.Getenv("ARVADOS_API_TOKEN"),
+ Insecure: os.Getenv("ARVADOS_API_HOST_INSECURE") != "",
+ }
+}
+
+// Do adds authentication headers and then calls (*http.Client)Do().
+func (c *Client) Do(req *http.Request) (*http.Response, error) {
+ if c.AuthToken != "" {
+ req.Header.Add("Authorization", "OAuth2 "+c.AuthToken)
+ }
+ return c.httpClient().Do(req)
+}
+
+// DoAndDecode performs req and unmarshals the response (which must be
+// JSON) into dst. Use this instead of RequestAndDecode if you need
+// more control of the http.Request object.
+func (c *Client) DoAndDecode(dst interface{}, req *http.Request) error {
+ resp, err := c.Do(req)
+ if err != nil {
+ return err
+ }
+ defer resp.Body.Close()
+ buf, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ return err
+ }
+ if resp.StatusCode != 200 {
+ return fmt.Errorf("request failed (%s): %s", req.URL, resp.Status)
+ }
+ return json.Unmarshal(buf, dst)
+}
+
+// RequestAndDecode performs an API request and unmarshals the
+// response (which must be JSON) into dst. Method and body arguments
+// are the same as for http.NewRequest(). The given path is added to
+// the server's scheme/host/port to form the request URL. The given
+// params are passed via POST form or query string.
+//
+// path must not contain a query string.
+func (c *Client) RequestAndDecode(dst interface{}, method, path string, body io.Reader, params interface{}) error {
+ urlString := c.apiURL(path)
+ var urlValues url.Values
+ if v, ok := params.(url.Values); ok {
+ urlValues = v
+ } else if params != nil {
+ // Convert an arbitrary struct to url.Values. For
+ // example, Foo{Bar: []int{1,2,3}, Baz: "waz"} becomes
+ // url.Values{`bar`:`{"a":[1,2,3]}`,`Baz`:`waz`}
+ //
+ // TODO: Do this more efficiently, possibly using
+ // json.Decode/Encode, so the whole thing doesn't have
+ // to get encoded, decoded, and re-encoded.
+ j, err := json.Marshal(params)
+ if err != nil {
+ return err
+ }
+ var generic map[string]interface{}
+ err = json.Unmarshal(j, &generic)
+ if err != nil {
+ return err
+ }
+ urlValues = url.Values{}
+ for k, v := range generic {
+ if v, ok := v.(string); ok {
+ urlValues.Set(k, v)
+ continue
+ }
+ j, err := json.Marshal(v)
+ if err != nil {
+ return err
+ }
+ urlValues.Set(k, string(j))
+ }
+ }
+ if (method == "GET" || body != nil) && urlValues != nil {
+ // FIXME: what if params don't fit in URL
+ u, err := url.Parse(urlString)
+ if err != nil {
+ return err
+ }
+ u.RawQuery = urlValues.Encode()
+ urlString = u.String()
+ }
+ req, err := http.NewRequest(method, urlString, body)
+ if err != nil {
+ return err
+ }
+ return c.DoAndDecode(dst, req)
+}
+
+func (c *Client) httpClient() *http.Client {
+ if c.Client == nil {
+ return http.DefaultClient
+ }
+ return c.Client
+}
+
+func (c *Client) apiURL(path string) string {
+ return "https://" + c.APIHost + "/" + path
+}
+
+// DiscoveryDocument is the Arvados server's description of itself.
+type DiscoveryDocument struct {
+ DefaultCollectionReplication int `json:"defaultCollectionReplication"`
+ BlobSignatureTTL int64 `json:"blobSignatureTtl"`
+}
+
+// DiscoveryDocument returns a *DiscoveryDocument. The returned object
+// should not be modified: the same object may be returned by
+// subsequent calls.
+func (c *Client) DiscoveryDocument() (*DiscoveryDocument, error) {
+ var dd DiscoveryDocument
+ return &dd, c.RequestAndDecode(&dd, "GET", "discovery/v1/apis/arvados/v1/rest", nil, nil)
+}
diff --git a/sdk/go/x/arvados/client_test.go b/sdk/go/x/arvados/client_test.go
new file mode 100644
index 0000000..2db50bf
--- /dev/null
+++ b/sdk/go/x/arvados/client_test.go
@@ -0,0 +1,83 @@
+package arvados
+
+import (
+ "bytes"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+ "sync"
+ "testing"
+)
+
+type stubTransport struct {
+ Responses map[string]string
+ Requests []http.Request
+ sync.Mutex
+}
+
+func (stub *stubTransport) RoundTrip(req *http.Request) (*http.Response, error) {
+ stub.Lock()
+ stub.Requests = append(stub.Requests, *req)
+ stub.Unlock()
+
+ resp := &http.Response{
+ Status: "200 OK",
+ StatusCode: 200,
+ Proto: "HTTP/1.1",
+ ProtoMajor: 1,
+ ProtoMinor: 1,
+ Request: req,
+ }
+ str := stub.Responses[req.URL.Path]
+ if str == "" {
+ resp.Status = "404 Not Found"
+ resp.StatusCode = 404
+ str = "{}"
+ }
+ buf := bytes.NewBufferString(str)
+ resp.Body = ioutil.NopCloser(buf)
+ resp.ContentLength = int64(buf.Len())
+ return resp, nil
+}
+
+type errorTransport struct{}
+
+func (stub *errorTransport) RoundTrip(req *http.Request) (*http.Response, error) {
+ return nil, fmt.Errorf("something awful happened")
+}
+
+func TestCurrentUser(t *testing.T) {
+ t.Parallel()
+ stub := &stubTransport{
+ Responses: map[string]string{
+ "/arvados/v1/users/current": `{"uuid":"zzzzz-abcde-012340123401234"}`,
+ },
+ }
+ c := &Client{
+ Client: &http.Client{
+ Transport: stub,
+ },
+ APIHost: "zzzzz.arvadosapi.com",
+ AuthToken: "xyzzy",
+ }
+ u, err := c.CurrentUser()
+ if err != nil {
+ t.Fatal(err)
+ }
+ if x := "zzzzz-abcde-012340123401234"; u.UUID != x {
+ t.Errorf("got uuid %q, expected %q", u.UUID, x)
+ }
+ if len(stub.Requests) < 1 {
+ t.Fatal("empty stub.Requests")
+ }
+ hdr := stub.Requests[len(stub.Requests)-1].Header
+ if hdr.Get("Authorization") != "OAuth2 xyzzy" {
+ t.Errorf("got headers %+q, expected Authorization header", hdr)
+ }
+
+ c.Client.Transport = &errorTransport{}
+ u, err = c.CurrentUser()
+ if err == nil {
+ t.Errorf("got nil error, expected something awful")
+ }
+}
diff --git a/sdk/go/x/arvados/collection.go b/sdk/go/x/arvados/collection.go
new file mode 100644
index 0000000..602b02c
--- /dev/null
+++ b/sdk/go/x/arvados/collection.go
@@ -0,0 +1,155 @@
+package arvados
+
+import (
+ "bufio"
+ "encoding/json"
+ "fmt"
+ "strings"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/manifest"
+)
+
+type Collection struct {
+ UUID string `json:"uuid,omitempty"`
+ ExpiresAt *time.Time `json:"expires_at,omitempty"`
+ ManifestText string `json:"manifest_text,omitempty"`
+ CreatedAt *time.Time `json:"created_at,omitempty"`
+ ModifiedAt *time.Time `json:"modified_at,omitempty"`
+ PortableDataHash string `json:"portable_data_hash,omitempty"`
+ ReplicationConfirmed *int `json:"replication_confirmed,omitempty"`
+ ReplicationConfirmedAt *time.Time `json:"replication_confirmed_at,omitempty"`
+ ReplicationDesired *int `json:"replication_desired,omitempty"`
+}
+
+func (c *Collection) SizedDigests() ([]SizedDigest, error) {
+ if c.ManifestText == "" && c.PortableDataHash != "d41d8cd98f00b204e9800998ecf8427e+0" {
+ // TODO: Check more subtle forms of corruption, too
+ return nil, fmt.Errorf("manifest is missing")
+ }
+ var sds []SizedDigest
+ scanner := bufio.NewScanner(strings.NewReader(c.ManifestText))
+ scanner.Buffer(make([]byte, 1048576), len(c.ManifestText))
+ for scanner.Scan() {
+ line := scanner.Text()
+ tokens := strings.Split(line, " ")
+ if len(tokens) < 3 {
+ return nil, fmt.Errorf("Invalid stream (<3 tokens): %q", line)
+ }
+ for _, token := range tokens[1:] {
+ if !manifest.LocatorPattern.MatchString(token) {
+ // FIXME: ensure it's a file token
+ break
+ }
+ // FIXME: shouldn't assume 32 char hash
+ if i := strings.IndexRune(token[33:], '+'); i >= 0 {
+ token = token[:33+i]
+ }
+ sds = append(sds, SizedDigest(token))
+ }
+ }
+ return sds, scanner.Err()
+}
+
+type CollectionList struct {
+ Items []Collection `json:"items"`
+ ItemsAvailable int `json:"items_available"`
+ Offset int `json:"offset"`
+ Limit int `json:"limit"`
+}
+
+type Filter struct {
+ Attr string
+ Operator string
+ Operand interface{}
+}
+
+func (f *Filter) MarshalJSON() ([]byte, error) {
+ return json.Marshal([]interface{}{f.Attr, f.Operator, f.Operand})
+}
+
+type ResourceListParams struct {
+ Select []string `json:"select,omitempty"`
+ Filters []Filter `json:"filters,omitempty"`
+ Limit *int `json:"limit,omitempty"`
+ Offset int `json:"offset,omitempty"`
+ Order string `json:"order,omitempty"`
+}
+
+// EachCollection calls f once for every readable
+// collection. EachCollection stops if it encounters an error, such as
+// f returning a non-nil error.
+//
+// The progress function is called periodically with done (number of
+// times f has been called) and total (number of times f is expected
+// to be called).
+func (c *Client) EachCollection(f func(Collection) error, progress func(done, total int)) error {
+ if progress == nil {
+ progress = func(_, _ int) {}
+ }
+
+ var expectCount int
+ {
+ var page CollectionList
+ var zero int
+ err := c.RequestAndDecode(&page, "GET", "arvados/v1/collections", nil, ResourceListParams{Limit: &zero})
+ if err != nil {
+ return err
+ }
+ expectCount = page.ItemsAvailable
+ }
+
+ limit := 1000
+ params := ResourceListParams{
+ Limit: &limit,
+ Order: "modified_at, uuid",
+ Select: []string{"uuid", "manifest_text", "modified_at", "portable_data_hash"},
+ }
+ var last Collection
+ var filterTime time.Time
+ callCount := 0
+ for {
+ progress(callCount, expectCount)
+ var page CollectionList
+ err := c.RequestAndDecode(&page, "GET", "arvados/v1/collections", nil, params)
+ if err != nil {
+ return err
+ }
+ for _, coll := range page.Items {
+ if last.ModifiedAt != nil && *last.ModifiedAt == *coll.ModifiedAt && last.UUID >= coll.UUID {
+ continue
+ }
+ callCount++
+ err = f(coll)
+ if err != nil {
+ return err
+ }
+ last = coll
+ }
+ if last.ModifiedAt != nil && *last.ModifiedAt == filterTime {
+ if page.ItemsAvailable > len(page.Items) {
+ // TODO: use "mtime=X && UUID>Y"
+ // filters to get all collections with
+ // this timestamp, then use "mtime>X"
+ // to get the next timestamp.
+ return fmt.Errorf("BUG: Received an entire page with the same modified_at timestamp (%v), cannot make progress", filterTime)
+ }
+ break
+ }
+ filterTime = *last.ModifiedAt
+ params.Filters = []Filter{{
+ Attr: "modified_at",
+ Operator: ">=",
+ Operand: filterTime,
+ }, {
+ Attr: "uuid",
+ Operator: "!=",
+ Operand: last.UUID,
+ }}
+ }
+ progress(callCount, expectCount)
+ if callCount < expectCount {
+ return fmt.Errorf("Retrieved only %d collections, expected %d", callCount, expectCount)
+ }
+ return nil
+}
diff --git a/sdk/go/x/arvados/keep_block.go b/sdk/go/x/arvados/keep_block.go
new file mode 100644
index 0000000..68ef1d5
--- /dev/null
+++ b/sdk/go/x/arvados/keep_block.go
@@ -0,0 +1,14 @@
+package arvados
+
+import (
+ "strconv"
+ "strings"
+)
+
+// SizedDigest is hash+size
+type SizedDigest string
+
+func (sd SizedDigest) Size() int64 {
+ n, _ := strconv.ParseInt(strings.Split(string(sd), "+")[1], 10, 64)
+ return n
+}
diff --git a/sdk/go/x/arvados/keep_service.go b/sdk/go/x/arvados/keep_service.go
new file mode 100644
index 0000000..ff4a699
--- /dev/null
+++ b/sdk/go/x/arvados/keep_service.go
@@ -0,0 +1,121 @@
+package arvados
+
+import (
+ "bufio"
+ "fmt"
+ "net/http"
+ "strconv"
+ "strings"
+)
+
+type KeepService struct {
+ UUID string `json:"uuid"`
+ ServiceHost string `json:"service_host"`
+ ServicePort int `json:"service_port"`
+ ServiceSSLFlag bool `json:"service_ssl_flag"`
+ ServiceType string `json:"service_type"`
+ ReadOnly bool `json:"read_only"`
+}
+
+type KeepServiceList struct {
+ Items []KeepService `json:"items"`
+ ItemsAvailable int `json:"items_available"`
+ Offset int `json:"offset"`
+ Limit int `json:"limit"`
+}
+
+// IndexEntry is what keepstore's index response tells us about a
+// stored block.
+type KeepServiceIndexEntry struct {
+ SizedDigest
+ Mtime int64
+}
+
+// EachKeepService calls f once for every readable
+// KeepService. EachKeepService stops if it encounters an
+// error, such as f returning a non-nil error.
+func (c *Client) EachKeepService(f func(KeepService) error) error {
+ params := ResourceListParams{}
+ for {
+ var page KeepServiceList
+ err := c.RequestAndDecode(&page, "GET", "arvados/v1/keep_services", nil, params)
+ if err != nil {
+ return err
+ }
+ for _, item := range page.Items {
+ err = f(item)
+ if err != nil {
+ return err
+ }
+ }
+ params.Offset = params.Offset + len(page.Items)
+ if params.Offset >= page.ItemsAvailable {
+ return nil
+ }
+ }
+}
+
+func (s *KeepService) url(path string) string {
+ var f string
+ if s.ServiceSSLFlag {
+ f = "https://%s:%d/%s"
+ } else {
+ f = "http://%s:%d/%s"
+ }
+ return fmt.Sprintf(f, s.ServiceHost, s.ServicePort, path)
+}
+
+// String implements fmt.Stringer
+func (s *KeepService) String() string {
+ return s.UUID
+}
+
+// Index returns an unsorted list of blocks that can be retrieved from
+// this server.
+func (s *KeepService) Index(c *Client, prefix string) ([]KeepServiceIndexEntry, error) {
+ url := s.url("index/" + prefix)
+ req, err := http.NewRequest("GET", url, nil)
+ if err != nil {
+ return nil, fmt.Errorf("NewRequest(%v): %v", url, err)
+ }
+ resp, err := c.Do(req)
+ if err != nil {
+ return nil, fmt.Errorf("Do(%v): %v", url, err)
+ } else if resp.StatusCode != 200 {
+ return nil, fmt.Errorf("%v: %v", url, resp.Status)
+ }
+ defer resp.Body.Close()
+
+ var entries []KeepServiceIndexEntry
+ scanner := bufio.NewScanner(resp.Body)
+ sawEOF := false
+ for scanner.Scan() {
+ if sawEOF {
+ return nil, fmt.Errorf("Index response contained non-terminal blank line")
+ }
+ line := scanner.Text()
+ if line == "" {
+ sawEOF = true
+ continue
+ }
+ fields := strings.Split(line, " ")
+ if len(fields) != 2 {
+ return nil, fmt.Errorf("Malformed index line %q: %d fields", line, len(fields))
+ }
+ mtime, err := strconv.ParseInt(fields[1], 10, 64)
+ if err != nil {
+ return nil, fmt.Errorf("Malformed index line %q: mtime: %v", line, err)
+ }
+ entries = append(entries, KeepServiceIndexEntry{
+ SizedDigest: SizedDigest(fields[0]),
+ Mtime: mtime,
+ })
+ }
+ if err := scanner.Err(); err != nil {
+ return nil, fmt.Errorf("Error scanning index response: %v", err)
+ }
+ if !sawEOF {
+ return nil, fmt.Errorf("Index response had no EOF marker")
+ }
+ return entries, nil
+}
diff --git a/sdk/go/x/arvados/user.go b/sdk/go/x/arvados/user.go
new file mode 100644
index 0000000..cd506ee
--- /dev/null
+++ b/sdk/go/x/arvados/user.go
@@ -0,0 +1,14 @@
+package arvados
+
+type User struct {
+ UUID string `json:"uuid,omitempty"`
+ IsActive bool `json:"is_active"`
+ IsAdmin bool `json:"is_admin"`
+ Username string `json:"username,omitempty"`
+}
+
+func (c *Client) CurrentUser() (User, error) {
+ var u User
+ err := c.RequestAndDecode(&u, "GET", "arvados/v1/users/current", nil, nil)
+ return u, err
+}
diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go
new file mode 100644
index 0000000..1b42b77
--- /dev/null
+++ b/services/keep-balance/balance.go
@@ -0,0 +1,359 @@
+package main
+
+import (
+ "fmt"
+ "log"
+ "runtime"
+ "sync"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "git.curoverse.com/arvados.git/sdk/go/x/arvados"
+)
+
+type Balancer struct {
+ *BlockStateMap
+ KeepServices map[string]*KeepService
+ DefaultReplication int
+ ServiceTypes []string
+ Logger *log.Logger
+ MinMtime int64
+
+ collScanned int
+ serviceRoots map[string]string
+ errors []error
+ mutex sync.Mutex
+}
+
+func (bal *Balancer) addCollection(coll arvados.Collection) error {
+ blkids, err := coll.SizedDigests()
+ if err != nil {
+ bal.mutex.Lock()
+ bal.errors = append(bal.errors, fmt.Errorf("%v: %v", coll.UUID, err))
+ bal.mutex.Unlock()
+ return nil
+ }
+ repl := bal.DefaultReplication
+ if coll.ReplicationDesired != nil {
+ repl = *coll.ReplicationDesired
+ }
+ debugf("%v: %d block x%d", coll.UUID, len(blkids), repl)
+ bal.BlockStateMap.IncreaseDesired(repl, blkids)
+ return nil
+}
+
+func (bal *Balancer) addKeepService(srv arvados.KeepService) error {
+ for _, t := range bal.ServiceTypes {
+ if t == srv.ServiceType {
+ bal.KeepServices[srv.UUID] = &KeepService{KeepService: srv}
+ return nil
+ }
+ }
+ bal.Logf("skipping %v with service type %q", srv.UUID, srv.ServiceType)
+ return nil
+}
+
+func (bal *Balancer) Logf(f string, args ...interface{}) {
+ if bal.Logger != nil {
+ bal.Logger.Printf(f, args...)
+ }
+}
+
+func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
+ u, err := c.CurrentUser()
+ if err != nil {
+ return fmt.Errorf("CurrentUser(): %v", err)
+ }
+ if !u.IsActive || !u.IsAdmin {
+ return fmt.Errorf("Current user (%s) is not an active admin user", u.UUID)
+ }
+ return nil
+}
+
+func (bal *Balancer) CheckSanityLate() error {
+ if bal.errors != nil {
+ for _, err := range bal.errors {
+ log.Println("deferred error:", err)
+ }
+ return fmt.Errorf("cannot proceed safely after deferred errors")
+ }
+
+ if bal.collScanned == 0 {
+ return fmt.Errorf("received zero collections")
+ }
+
+ anyDesired := false
+ bal.BlockStateMap.Apply(func(_ arvados.SizedDigest, blk *BlockState) {
+ if blk.Desired > 0 {
+ anyDesired = true
+ }
+ })
+ if !anyDesired {
+ return fmt.Errorf("zero blocks have desired replication>0")
+ }
+
+ // TODO: no two services have identical indexes
+ // TODO: no collisions (same md5, different size)
+ return nil
+}
+
+// GetCurrentState determines the current replication state, and the
+// desired replication level, for every block that is either
+// retrievable or referenced.
+//
+// It determines the current replication state by reading the block index
+// from every known Keep service.
+//
+// It determines the desired replication level by retrieving all
+// collection manifests in the database (API server).
+//
+// It encodes the resulting information in BlockStateMap.
+func (bal *Balancer) GetCurrentState(c *arvados.Client) error {
+ defer timeMe()("GetCurrentState")
+ bal.BlockStateMap = NewBlockStateMap()
+ bal.KeepServices = make(map[string]*KeepService)
+
+ dd, err := c.DiscoveryDocument()
+ if err != nil {
+ return err
+ }
+ bal.DefaultReplication = dd.DefaultCollectionReplication
+ bal.MinMtime = time.Now().Unix() - dd.BlobSignatureTTL
+
+ errs := make(chan error)
+ wg := sync.WaitGroup{}
+
+ if err := c.EachKeepService(bal.addKeepService); err != nil {
+ return err
+ }
+ for _, srv := range bal.KeepServices {
+ wg.Add(1)
+ go func(srv *KeepService) {
+ defer wg.Done()
+ bal.Logf("%s: retrieve index", srv)
+ idx, err := srv.Index(c, "")
+ if err != nil {
+ errs <- fmt.Errorf("%s: %v", srv, err)
+ return
+ }
+ bal.Logf("%s: add replicas to map", srv)
+ bal.BlockStateMap.AddReplicas(srv, idx)
+ bal.Logf("%s: done", srv)
+ }(srv)
+ }
+
+ collQ := make(chan arvados.Collection, 1000)
+
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for coll := range collQ {
+ err := bal.addCollection(coll)
+ if err != nil {
+ errs <- err
+ for _ = range collQ {
+ }
+ return
+ }
+ bal.collScanned++
+ }
+ }()
+
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ err = c.EachCollection(
+ func(coll arvados.Collection) error {
+ collQ <- coll
+ if len(errs) > 0 {
+ // some other GetCurrentState
+ // error happened: no point
+ // getting any more
+ // collections.
+ return fmt.Errorf("")
+ }
+ return nil
+ }, func(done, total int) {
+ bal.Logf("collections: %d/%d", done, total)
+ })
+ close(collQ)
+ if err != nil {
+ errs <- err
+ }
+ }()
+
+ go func() {
+ wg.Wait()
+ close(errs)
+ }()
+ return <-errs
+}
+
+// ComputeChangeSets compares, for each known block, the current and
+// desired replication states. If it is possible to get closer to the
+// desired state by copying or deleting blocks, it submits those
+// changes to the relevant KeepServices' ChangeSets.
+func (bal *Balancer) ComputeChangeSets() {
+ defer timeMe()("ComputeChangeSets")
+ bal.serviceRoots = make(map[string]string)
+ for _, srv := range bal.KeepServices {
+ bal.serviceRoots[srv.UUID] = srv.UUID
+ }
+
+ type balanceTask struct {
+ blkid arvados.SizedDigest
+ blk *BlockState
+ }
+ todo := make(chan balanceTask, 16)
+ var wg sync.WaitGroup
+ for i := 0; i < 1+runtime.NumCPU(); i++ {
+ wg.Add(1)
+ go func() {
+ for work := range todo {
+ bal.balanceBlock(work.blkid, work.blk)
+ }
+ wg.Done()
+ }()
+ }
+ bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
+ todo <- balanceTask{
+ blkid: blkid,
+ blk: blk,
+ }
+ })
+ close(todo)
+ wg.Wait()
+}
+
+// balanceBlock compares current state to desired state for a single
+// block, and makes the appropriate ChangeSet calls.
+func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) {
+ debugf("balanceBlock: %v %+v", blkid, blk)
+ uuids := keepclient.NewRootSorter(bal.serviceRoots, string(blkid[:32])).GetSortedRoots()
+ hasRepl := make(map[string]Replica, len(bal.serviceRoots))
+ for _, repl := range blk.Replicas {
+ hasRepl[repl.UUID] = repl
+ }
+ // To be safe we assume two replicas with the same Mtime are
+ // in fact the same replica being reported more than
+ // once. len(uniqueBestRepl) is the number of distinct
+ // replicas in the best rendezvous positions we've considered
+ // so far.
+ uniqueBestRepl := make(map[int64]bool, len(bal.serviceRoots))
+ // pulls is the number of Pull changes we have already
+ // requested. (For purposes of deciding whether to Pull to
+ // rendezvous position N, we should assume all pulls we have
+ // requested on rendezvous positions M<N will be successful.)
+ pulls := 0
+ for _, uuid := range uuids {
+ srv := bal.KeepServices[uuid]
+ // TODO: request a Touch if Mtime is duplicated.
+ if repl, ok := hasRepl[srv.UUID]; ok {
+ // This service has a replica. We should
+ // delete it if [1] we already have enough
+ // distinct replicas in better rendezvous
+ // positions and [2] this replica's Mtime is
+ // distinct from the better replicas' Mtime.
+ if len(uniqueBestRepl) >= blk.Desired &&
+ !uniqueBestRepl[repl.Mtime] &&
+ repl.Mtime < bal.MinMtime {
+ srv.AddTrash(Trash{
+ SizedDigest: blkid,
+ Mtime: repl.Mtime,
+ })
+ }
+ uniqueBestRepl[repl.Mtime] = true
+ } else if pulls+len(uniqueBestRepl) < blk.Desired &&
+ len(blk.Replicas) > 0 &&
+ !srv.ReadOnly {
+ // This service doesn't have a replica. We
+ // should pull one to this server if we don't
+ // already have enough (existing+requested)
+ // replicas in better rendezvous positions.
+ srv.AddPull(Pull{
+ SizedDigest: blkid,
+ Source: blk.Replicas[0].KeepService,
+ })
+ pulls++
+ }
+ }
+}
+
+type blocksNBytes struct {
+ replicas int
+ blocks int
+ bytes int64
+}
+
+func (bb blocksNBytes) String() string {
+ return fmt.Sprintf("%d replicas (%d blocks, %d bytes)", bb.replicas, bb.blocks, bb.bytes)
+}
+
+func (bal *Balancer) PrintStatistics() {
+ var lost, overrep, unref, garbage, underrep, justright, desired, current blocksNBytes
+ bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
+ surplus := len(blk.Replicas) - blk.Desired
+ bytes := blkid.Size()
+ switch {
+ case len(blk.Replicas) == 0 && blk.Desired > 0:
+ lost.replicas -= surplus
+ lost.blocks++
+ lost.bytes += bytes * int64(-surplus)
+ case len(blk.Replicas) < blk.Desired:
+ underrep.replicas -= surplus
+ underrep.blocks++
+ underrep.bytes += bytes * int64(-surplus)
+ case len(blk.Replicas) > 0 && blk.Desired == 0:
+ counter := &garbage
+ for _, r := range blk.Replicas {
+ if r.Mtime >= bal.MinMtime {
+ counter = &unref
+ break
+ }
+ }
+ counter.replicas += surplus
+ counter.blocks++
+ counter.bytes += bytes * int64(surplus)
+ case len(blk.Replicas) > blk.Desired:
+ overrep.replicas += surplus
+ overrep.blocks++
+ overrep.bytes += bytes * int64(len(blk.Replicas)-blk.Desired)
+ default:
+ justright.replicas += blk.Desired
+ justright.blocks++
+ justright.bytes += bytes * int64(blk.Desired)
+ }
+
+ if blk.Desired > 0 {
+ desired.replicas += blk.Desired
+ desired.blocks++
+ desired.bytes += bytes * int64(blk.Desired)
+ }
+ current.replicas += len(blk.Replicas)
+ current.blocks++
+ current.bytes += bytes * int64(len(blk.Replicas))
+ })
+ fmt.Println("===")
+ fmt.Println(lost, "lost (0=have<want)")
+ fmt.Println(underrep, "underreplicated (0<have<want)")
+ fmt.Println(justright, "just right (have=want)")
+ fmt.Println(overrep, "overreplicated (have>want>0)")
+ fmt.Println(unref, "unreferenced (have>want=0, new)")
+ fmt.Println(garbage, "garbage (have>want=0, old)")
+ fmt.Println("===")
+ fmt.Println(desired, "total commitment (excluding unreferenced)")
+ fmt.Println(current, "total usage")
+ fmt.Println("===")
+ for _, srv := range bal.KeepServices {
+ fmt.Printf("%s: %v\n", srv, srv.ChangeSet)
+ }
+ fmt.Println("===")
+}
+
+func (bal *Balancer) ApplyChangeSets(c *arvados.Client) error {
+ defer timeMe()("ApplyChangeSets")
+ if err := bal.CheckSanityLate(); err != nil {
+ return err
+ }
+ return fmt.Errorf("unimplemented (FIXME)")
+}
diff --git a/services/keep-balance/block_state.go b/services/keep-balance/block_state.go
new file mode 100644
index 0000000..f8edec5
--- /dev/null
+++ b/services/keep-balance/block_state.go
@@ -0,0 +1,87 @@
+package main
+
+import (
+ "sync"
+
+ "git.curoverse.com/arvados.git/sdk/go/x/arvados"
+)
+
+type Replica struct {
+ *KeepService
+ Mtime int64
+}
+
+type BlockState struct {
+ Replicas []Replica
+ Desired int
+}
+
+func (bs *BlockState) AddReplica(r Replica) {
+ bs.Replicas = append(bs.Replicas, r)
+}
+
+func (bs *BlockState) IncreaseDesired(n int) {
+ if bs.Desired < n {
+ bs.Desired = n
+ }
+}
+
+// BlockStateMap is a goroutine-safe wrapper around a
+// map[arvados.SizedDigest]*BlockState.
+type BlockStateMap struct {
+ entries map[arvados.SizedDigest]*BlockState
+ mutex sync.Mutex
+}
+
+func NewBlockStateMap() *BlockStateMap {
+ return &BlockStateMap{
+ entries: make(map[arvados.SizedDigest]*BlockState),
+ }
+}
+
+// Get returns a BlockState entry, allocating a new one if needed.
+func (bsm *BlockStateMap) Get(blkid arvados.SizedDigest) *BlockState {
+ // TODO? Allocate BlockState structs a slice at a time,
+ // instead of one at a time.
+ blk := bsm.entries[blkid]
+ if blk == nil {
+ blk = &BlockState{}
+ bsm.entries[blkid] = blk
+ }
+ return blk
+}
+
+// Apply runs f on each entry in the map.
+func (bsm *BlockStateMap) Apply(f func(arvados.SizedDigest, *BlockState)) {
+ bsm.mutex.Lock()
+ defer bsm.mutex.Unlock()
+
+ for blkid, blk := range bsm.entries {
+ f(blkid, blk)
+ }
+}
+
+// AddReplicas updates the map to indicate srv has a replica of each
+// block in idx.
+func (bsm *BlockStateMap) AddReplicas(srv *KeepService, idx []arvados.KeepServiceIndexEntry) {
+ bsm.mutex.Lock()
+ defer bsm.mutex.Unlock()
+
+ for _, ent := range idx {
+ bsm.Get(ent.SizedDigest).AddReplica(Replica{
+ KeepService: srv,
+ Mtime: ent.Mtime,
+ })
+ }
+}
+
+// IncreaseDesired updates the map to indicate the desired replication
+// for the given blocks is at least n.
+func (bsm *BlockStateMap) IncreaseDesired(n int, blocks []arvados.SizedDigest) {
+ bsm.mutex.Lock()
+ defer bsm.mutex.Unlock()
+
+ for _, blkid := range blocks {
+ bsm.Get(blkid).IncreaseDesired(n)
+ }
+}
diff --git a/services/keep-balance/change_set.go b/services/keep-balance/change_set.go
new file mode 100644
index 0000000..6b79b18
--- /dev/null
+++ b/services/keep-balance/change_set.go
@@ -0,0 +1,50 @@
+package main
+
+import (
+ "fmt"
+ "sync"
+
+ "git.curoverse.com/arvados.git/sdk/go/x/arvados"
+)
+
+// Pull is a request to retrieve a block from a remote server, and
+// store it locally.
+type Pull struct {
+ arvados.SizedDigest
+ Source *KeepService
+}
+
+// Trash is a request to delete a block.
+type Trash struct {
+ arvados.SizedDigest
+ Mtime int64
+}
+
+// ChangeSet is a set of change requests that will be sent to a
+// keepstore server.
+type ChangeSet struct {
+ Pulls []Pull
+ Trashes []Trash
+ mutex sync.Mutex
+}
+
+// AddPull adds a Pull operation.
+func (cs *ChangeSet) AddPull(p Pull) {
+ cs.mutex.Lock()
+ cs.Pulls = append(cs.Pulls, p)
+ cs.mutex.Unlock()
+}
+
+// AddTrash adds a Trash operation
+func (cs *ChangeSet) AddTrash(t Trash) {
+ cs.mutex.Lock()
+ cs.Trashes = append(cs.Trashes, t)
+ cs.mutex.Unlock()
+}
+
+// String implements fmt.Stringer.
+func (cs ChangeSet) String() string {
+ cs.mutex.Lock()
+ defer cs.mutex.Unlock()
+ return fmt.Sprintf("ChangeSet{Pulls:%d, Trashes:%d}", len(cs.Pulls), len(cs.Trashes))
+}
diff --git a/services/keep-balance/example-config.json b/services/keep-balance/example-config.json
new file mode 100644
index 0000000..a6a6e58
--- /dev/null
+++ b/services/keep-balance/example-config.json
@@ -0,0 +1,11 @@
+{
+ "Client": {
+ "APIHost": "4xphq.arvadosapi.com:443",
+ "AuthToken": "zzzzzz",
+ "Insecure": false
+ },
+ "Apply": false,
+ "ServiceTypes": [
+ "disk"
+ ]
+}
diff --git a/services/keep-balance/keep_service.go b/services/keep-balance/keep_service.go
new file mode 100644
index 0000000..f4db28e
--- /dev/null
+++ b/services/keep-balance/keep_service.go
@@ -0,0 +1,15 @@
+package main
+
+import (
+ "fmt"
+ "git.curoverse.com/arvados.git/sdk/go/x/arvados"
+)
+
+type KeepService struct {
+ arvados.KeepService
+ ChangeSet
+}
+
+func (srv KeepService) String() string {
+ return fmt.Sprintf("%s (%s:%d, %s)", srv.UUID, srv.ServiceHost, srv.ServicePort, srv.ServiceType)
+}
diff --git a/services/keep-balance/main.go b/services/keep-balance/main.go
new file mode 100644
index 0000000..25ed818
--- /dev/null
+++ b/services/keep-balance/main.go
@@ -0,0 +1,71 @@
+package main
+
+import (
+ "encoding/json"
+ "flag"
+ "io/ioutil"
+ "log"
+ "os"
+
+ "git.curoverse.com/arvados.git/sdk/go/x/arvados"
+)
+
+type Config struct {
+ Client arvados.Client
+ Apply bool
+ ServiceTypes []string
+}
+
+var debugf func(string, ...interface{}) = func(string, ...interface{}) {}
+
+func main() {
+ var config Config
+
+ configPath := flag.String("config",
+ os.Getenv("HOME")+"/.config/arvados/keep-balance.json",
+ "configuration file")
+ debugFlag := flag.Bool("debug", false, "enable debug messages")
+ flag.Parse()
+
+ if buf, err := ioutil.ReadFile(*configPath); err != nil {
+ log.Fatalf("Reading %q: %v", *configPath, err)
+ } else if err = json.Unmarshal(buf, &config); err != nil {
+ log.Fatalf("Decoding %q: %v", *configPath, err)
+ }
+
+ if *debugFlag {
+ debugf = log.Printf
+ if j, err := json.Marshal(config); err != nil {
+ log.Fatal(err)
+ } else {
+ log.Printf("config is %s", j)
+ }
+ }
+ if err := Run(config); err != nil {
+ log.Fatal(err)
+ }
+}
+
+func Run(config Config) error {
+ bal := Balancer{
+ Logger: log.New(os.Stderr, "", log.LstdFlags),
+ ServiceTypes: config.ServiceTypes,
+ }
+
+ var err error
+ if err = bal.CheckSanityEarly(&config.Client); err != nil {
+ return err
+ }
+ if err = bal.GetCurrentState(&config.Client); err != nil {
+ return err
+ }
+ bal.ComputeChangeSets()
+ bal.PrintStatistics()
+ if err = bal.CheckSanityLate(); err != nil {
+ return err
+ }
+ if config.Apply {
+ err = bal.ApplyChangeSets(&config.Client)
+ }
+ return err
+}
diff --git a/services/keep-balance/time_me.go b/services/keep-balance/time_me.go
new file mode 100644
index 0000000..549ccd7
--- /dev/null
+++ b/services/keep-balance/time_me.go
@@ -0,0 +1,13 @@
+package main
+
+import (
+ "fmt"
+ "time"
+)
+
+func timeMe() func(string) {
+ t0 := time.Now()
+ return func(label string) {
+ fmt.Printf("%s: %v\n", label, time.Since(t0))
+ }
+}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list