[ARVADOS] created: 6be020502307375cb59e782f9fd2437b7772c692
Git user
git at public.curoverse.com
Sat May 14 01:20:26 EDT 2016
at 6be020502307375cb59e782f9fd2437b7772c692 (commit)
commit 6be020502307375cb59e782f9fd2437b7772c692
Author: Tom Clegg <tom at curoverse.com>
Date: Sat May 14 01:20:01 2016 -0400
9162: Add keep-balance
diff --git a/build/run-tests.sh b/build/run-tests.sh
index d229341..66e38b7 100755
--- a/build/run-tests.sh
+++ b/build/run-tests.sh
@@ -70,6 +70,7 @@ services/fuse
services/keep-web
services/keepproxy
services/keepstore
+services/keep-balance
services/login-sync
services/nodemanager
services/crunch-run
@@ -714,6 +715,7 @@ gostuff=(
services/keep-web
services/keepstore
sdk/go/keepclient
+ services/keep-balance
services/keepproxy
services/datamanager/summary
services/datamanager/collection
diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go
new file mode 100644
index 0000000..284b079
--- /dev/null
+++ b/services/keep-balance/balance.go
@@ -0,0 +1,316 @@
+package main
+
+import (
+ "fmt"
+ "log"
+ "runtime"
+ "sync"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+)
+
+type Balancer struct {
+ *BlockStateMap
+ KeepServices map[string]*KeepService
+ DefaultReplication int
+ serviceRoots map[string]string
+ Logger *log.Logger
+ MinMtime int64
+}
+
+func (bal *Balancer) addCollection(coll Collection) error {
+ blkids, err := coll.SizedDigests()
+ if err != nil {
+ return fmt.Errorf("%v: %v", coll.UUID, err)
+ }
+ repl := bal.DefaultReplication
+ if coll.ReplicationDesired != nil {
+ repl = *coll.ReplicationDesired
+ }
+ debugf("%v: %d block x%d", coll.UUID, len(blkids), repl)
+ bal.BlockStateMap.IncreaseDesired(repl, blkids)
+ return nil
+}
+
+func (bal *Balancer) addKeepService(srv KeepServiceModel) error {
+ switch srv.ServiceType {
+ case "disk", "azure":
+ bal.KeepServices[srv.UUID] = &KeepService{
+ KeepServiceModel: srv,
+ }
+ return nil
+ case "proxy":
+ return nil
+ default:
+ return fmt.Errorf("%v has unexpected service type %q", srv.UUID, srv.ServiceType)
+ }
+}
+
+func (bal *Balancer) Logf(f string, args ...interface{}) {
+ if bal.Logger != nil {
+ bal.Logger.Printf(f, args...)
+ }
+}
+
+func (bal *Balancer) CheckSanity() error {
+ // TODO: current user is admin
+ // TODO: number of collections > 0
+ // TODO: some blocks have desired > 0
+ // TODO: no two services have identical indexes
+ // TODO: no collisions (same md5, different size)
+ return nil
+}
+
+// GetCurrentState determines the current replication state, and the
+// desired replication level, for every block that is either
+// retrievable or referenced.
+//
+// It determines the current replication state by reading the block index
+// from every known Keep service.
+//
+// It determines the desired replication level by retrieving all
+// collection manifests in the database (API server).
+//
+// It encodes the resulting information in BlockStateMap.
+func (bal *Balancer) GetCurrentState(c *Client) error {
+ defer timeMe()("GetCurrentState")
+ bal.BlockStateMap = NewBlockStateMap()
+ bal.KeepServices = make(map[string]*KeepService)
+
+ dd, err := c.DiscoveryDocument()
+ if err != nil {
+ return err
+ }
+ bal.DefaultReplication = dd.DefaultCollectionReplication
+ bal.MinMtime = time.Now().Unix() - dd.BlobSignatureTTL
+
+ errs := make(chan error)
+ wg := sync.WaitGroup{}
+
+ if err := c.EachKeepServiceModel(bal.addKeepService); err != nil {
+ return err
+ }
+ for _, srv := range bal.KeepServices {
+ wg.Add(1)
+ go func(srv *KeepService) {
+ defer wg.Done()
+ bal.Logf("%s: retrieve index", srv)
+ idx, err := srv.Index(c, "")
+ if err != nil {
+ errs <- fmt.Errorf("%s: %v", srv, err)
+ return
+ }
+ bal.Logf("%s: add replicas to map", srv)
+ bal.BlockStateMap.AddReplicas(srv, idx)
+ bal.Logf("%s: done", srv)
+ }(srv)
+ }
+
+ collQ := make(chan Collection, 200)
+
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for coll := range collQ {
+ err := bal.addCollection(coll)
+ if err != nil {
+ errs <- err
+ for _ = range collQ {
+ }
+ return
+ }
+ }
+ }()
+
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ err = c.EachCollection(
+ func(coll Collection) error {
+ collQ <- coll
+ if len(errs) > 0 {
+ // some other GetCurrentState
+ // error happened: no point
+ // getting any more
+ // collections.
+ return fmt.Errorf("")
+ }
+ return nil
+ }, func(done, total int) {
+ bal.Logf("collections: %d/%d", done, total)
+ })
+ close(collQ)
+ if err != nil {
+ errs <- err
+ }
+ }()
+
+ go func() {
+ wg.Wait()
+ close(errs)
+ }()
+ return <-errs
+}
+
+// ComputeChangeSets compares, for each known block, the current and
+// desired replication states. If it is possible to get closer to the
+// desired state by copying or deleting blocks, it submits those
+// changes to the relevant KeepServices' ChangeSets.
+func (bal *Balancer) ComputeChangeSets() {
+ defer timeMe()("ComputeChangeSets")
+ bal.serviceRoots = make(map[string]string)
+ for _, srv := range bal.KeepServices {
+ bal.serviceRoots[srv.UUID] = srv.UUID
+ }
+
+ type balanceTask struct {
+ blkid SizedDigest
+ blk *BlockState
+ }
+ todo := make(chan balanceTask, 16)
+ var wg sync.WaitGroup
+ for i := 0; i < runtime.NumCPU(); i++ {
+ wg.Add(1)
+ go func() {
+ for work := range todo {
+ bal.balanceBlock(work.blkid, work.blk)
+ }
+ wg.Done()
+ }()
+ }
+ bal.BlockStateMap.Apply(func(blkid SizedDigest, blk *BlockState) {
+ todo <- balanceTask{
+ blkid: blkid,
+ blk: blk,
+ }
+ })
+ close(todo)
+ wg.Wait()
+}
+
+// balanceBlock compares current state to desired state for a single
+// block, and makes the appropriate ChangeSet calls.
+func (bal *Balancer) balanceBlock(blkid SizedDigest, blk *BlockState) {
+ debugf("balanceBlock: %v %+v", blkid, blk)
+ uuids := keepclient.NewRootSorter(bal.serviceRoots, string(blkid[:32])).GetSortedRoots()
+ hasRepl := make(map[string]Replica, len(bal.serviceRoots))
+ for _, repl := range blk.Replicas {
+ hasRepl[repl.UUID] = repl
+ }
+ // To be safe we assume two replicas with the same Mtime are
+ // in fact the same replica being reported more than
+ // once. len(uniqueBestRepl) is the number of distinct
+ // replicas in the best rendezvous positions we've considered
+ // so far.
+ uniqueBestRepl := make(map[int64]bool, len(bal.serviceRoots))
+ // pulls is the number of Pull changes we have already
+ // requested. (For purposes of deciding whether to Pull to
+ // rendezvous position N, we should assume all pulls we have
+ // requested on rendezvous positions M<N will be successful.)
+ pulls := 0
+ for _, uuid := range uuids {
+ srv := bal.KeepServices[uuid]
+ // TODO: request a Touch if Mtime is duplicated.
+ if repl, ok := hasRepl[srv.UUID]; ok {
+ // This service has a replica. We should
+ // delete it if [1] we already have enough
+ // distinct replicas in better rendezvous
+ // positions and [2] this replica's Mtime is
+ // distinct from the better replicas' Mtime.
+ if len(uniqueBestRepl) >= blk.Desired &&
+ !uniqueBestRepl[repl.Mtime] {
+ // FIXME: check Mtime against discovery doc TTL
+ srv.AddTrash(Trash{
+ SizedDigest: blkid,
+ Mtime: repl.Mtime,
+ })
+ }
+ uniqueBestRepl[repl.Mtime] = true
+ } else if pulls+len(uniqueBestRepl) < blk.Desired &&
+ len(blk.Replicas) > 0 &&
+ !srv.ReadOnly {
+ // This service doesn't have a replica. We
+ // should pull one to this server if we don't
+ // already have enough (existing+requested)
+ // replicas in better rendezvous positions.
+ srv.AddPull(Pull{
+ SizedDigest: blkid,
+ Source: blk.Replicas[0].KeepService,
+ })
+ pulls++
+ }
+ }
+}
+
+type blocksNBytes struct {
+ replicas int
+ blocks int
+ bytes int64
+}
+
+func (bb blocksNBytes) String() string {
+ return fmt.Sprintf("%d replicas (%d blocks, %d bytes)", bb.replicas, bb.blocks, bb.bytes)
+}
+
+func (bal *Balancer) PrintStatistics() {
+ var lost, overrep, unref, garbage, underrep, justright, ideal, actual blocksNBytes
+ bal.BlockStateMap.Apply(func(blkid SizedDigest, blk *BlockState) {
+ surplus := len(blk.Replicas) - blk.Desired
+ bytes := blkid.Size()
+ if len(blk.Replicas) == 0 && blk.Desired > 0 {
+ lost.replicas -= surplus
+ lost.blocks++
+ lost.bytes += bytes
+ } else if len(blk.Replicas) < blk.Desired {
+ underrep.replicas -= surplus
+ underrep.blocks++
+ underrep.bytes += bytes * int64(-surplus)
+ } else if len(blk.Replicas) > 0 && blk.Desired == 0 {
+ // Weird case, not all replicas are in the same category
+ countAsWhat := &garbage.blocks
+ for _, r := range blk.Replicas {
+ if r.Mtime < bal.MinMtime {
+ garbage.replicas++
+ garbage.bytes += bytes
+ } else {
+ countAsWhat = &unref.blocks
+ unref.replicas++
+ unref.bytes += bytes
+ }
+ }
+ *countAsWhat++
+ } else if len(blk.Replicas) > blk.Desired {
+ overrep.replicas += surplus
+ overrep.blocks++
+ overrep.bytes += bytes * int64(len(blk.Replicas)-blk.Desired)
+ } else {
+ justright.replicas += blk.Desired
+ justright.blocks++
+ justright.bytes += bytes * int64(blk.Desired)
+ }
+
+ if blk.Desired > 0 {
+ ideal.replicas += blk.Desired
+ ideal.blocks++
+ ideal.bytes += bytes * int64(blk.Desired)
+ }
+ actual.replicas += len(blk.Replicas)
+ actual.blocks++
+ actual.bytes += bytes * int64(len(blk.Replicas))
+ })
+ fmt.Println(lost, "lost (0=have<want)")
+ fmt.Println(underrep, "underreplicated (0<have<want)")
+ fmt.Println(overrep, "overreplicated (have>want>0)")
+ fmt.Println(garbage, "garbage (have>want=0)")
+ fmt.Println(justright, "just right (have=want)")
+ for _, srv := range bal.KeepServices {
+ fmt.Printf("%s: %v\n", srv, srv.ChangeSet)
+ }
+}
+
+func (bal *Balancer) ApplyChangeSets(c *Client) error {
+ defer timeMe()("ApplyChangeSets")
+ // FIXME: do stuff
+ return nil
+}
diff --git a/services/keep-balance/block_state.go b/services/keep-balance/block_state.go
new file mode 100644
index 0000000..c087ecd
--- /dev/null
+++ b/services/keep-balance/block_state.go
@@ -0,0 +1,95 @@
+package main
+
+import (
+ "strconv"
+ "strings"
+ "sync"
+)
+
+// SizedDigest is hash+size
+type SizedDigest string
+
+func (sd SizedDigest) Size() int64 {
+ n, _ := strconv.ParseInt(strings.Split(string(sd), "+")[1], 10, 64)
+ return n
+}
+
+type Replica struct {
+ *KeepService
+ Mtime int64
+}
+
+type BlockState struct {
+ Replicas []Replica
+ Desired int
+}
+
+func (bs *BlockState) AddReplica(r Replica) {
+ bs.Replicas = append(bs.Replicas, r)
+}
+
+func (bs *BlockState) IncreaseDesired(n int) {
+ if bs.Desired < n {
+ bs.Desired = n
+ }
+}
+
+// BlockStateMap is a goroutine-safe wrapper around a
+// map[SizedDigest]*BlockState.
+type BlockStateMap struct {
+ entries map[SizedDigest]*BlockState
+ mutex sync.Mutex
+}
+
+func NewBlockStateMap() *BlockStateMap {
+ return &BlockStateMap{
+ entries: make(map[SizedDigest]*BlockState),
+ }
+}
+
+// Get returns a BlockState entry, allocating a new one if needed.
+func (bsm *BlockStateMap) Get(blkid SizedDigest) *BlockState {
+ // TODO? Allocate BlockState structs a slice at a time,
+ // instead of one at a time.
+ blk := bsm.entries[blkid]
+ if blk == nil {
+ blk = &BlockState{}
+ bsm.entries[blkid] = blk
+ }
+ return blk
+}
+
+// Apply runs f on each entry in the map.
+func (bsm *BlockStateMap) Apply(f func(SizedDigest, *BlockState)) {
+ bsm.mutex.Lock()
+ defer bsm.mutex.Unlock()
+
+ for blkid, blk := range bsm.entries {
+ f(blkid, blk)
+ }
+}
+
+// AddReplicas updates the map to indicate srv has a replica of each
+// block in idx.
+func (bsm *BlockStateMap) AddReplicas(srv *KeepService, idx []IndexEntry) {
+ bsm.mutex.Lock()
+ defer bsm.mutex.Unlock()
+
+ for _, ent := range idx {
+ bsm.Get(ent.SizedDigest).AddReplica(Replica{
+ KeepService: srv,
+ Mtime: ent.Mtime,
+ })
+ }
+}
+
+// IncreaseDesired updates the map to indicate the desired replication
+// for the given blocks is at least n.
+func (bsm *BlockStateMap) IncreaseDesired(n int, blocks []SizedDigest) {
+ bsm.mutex.Lock()
+ defer bsm.mutex.Unlock()
+
+ for _, blkid := range blocks {
+ bsm.Get(blkid).IncreaseDesired(n)
+ }
+}
diff --git a/services/keep-balance/change_set.go b/services/keep-balance/change_set.go
new file mode 100644
index 0000000..2090e81
--- /dev/null
+++ b/services/keep-balance/change_set.go
@@ -0,0 +1,48 @@
+package main
+
+import (
+ "fmt"
+ "sync"
+)
+
+// Pull is a request to retrieve a block from a remote server, and
+// store it locally.
+type Pull struct {
+ SizedDigest
+ Source *KeepService
+}
+
+// Trash is a request to delete a block.
+type Trash struct {
+ SizedDigest
+ Mtime int64
+}
+
+// ChangeSet is a set of change requests that will be sent to a
+// keepstore server.
+type ChangeSet struct {
+ Pulls []Pull
+ Trashes []Trash
+ mutex sync.Mutex
+}
+
+// AddPull adds a Pull operation.
+func (cs *ChangeSet) AddPull(p Pull) {
+ cs.mutex.Lock()
+ cs.Pulls = append(cs.Pulls, p)
+ cs.mutex.Unlock()
+}
+
+// AddTrash adds a Trash operation
+func (cs *ChangeSet) AddTrash(t Trash) {
+ cs.mutex.Lock()
+ cs.Trashes = append(cs.Trashes, t)
+ cs.mutex.Unlock()
+}
+
+// String implements fmt.Stringer.
+func (cs ChangeSet) String() string {
+ cs.mutex.Lock()
+ defer cs.mutex.Unlock()
+ return fmt.Sprintf("ChangeSet{Pulls=%d, Trashes=%d}", len(cs.Pulls), len(cs.Trashes))
+}
diff --git a/services/keep-balance/client.go b/services/keep-balance/client.go
new file mode 100644
index 0000000..86d95a2
--- /dev/null
+++ b/services/keep-balance/client.go
@@ -0,0 +1,133 @@
+package main
+
+import (
+ "encoding/json"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "net/http"
+ "net/url"
+ "os"
+)
+
+type Client struct {
+ *http.Client
+ APIHost string
+ AuthToken string
+ Insecure bool
+}
+
+func NewClientFromEnv() *Client {
+ return &Client{
+ APIHost: os.Getenv("ARVADOS_API_HOST"),
+ AuthToken: os.Getenv("ARVADOS_API_TOKEN"),
+ Insecure: os.Getenv("ARVADOS_API_HOST_INSECURE") != "",
+ }
+}
+
+// Do adds authentication headers and then calls (*http.Client)Do().
+func (c *Client) Do(req *http.Request) (*http.Response, error) {
+ if c.AuthToken != "" {
+ req.Header.Add("Authorization", "OAuth2 "+c.AuthToken)
+ }
+ debugf("%#v", req)
+ return c.httpClient().Do(req)
+}
+
+// DoAndDecode performs req and unmarshals the response (which must be
+// JSON) into dst. Use this instead of RequestAndDecode if you need
+// more control of the http.Request object.
+func (c *Client) DoAndDecode(dst interface{}, req *http.Request) error {
+ resp, err := c.Do(req)
+ if err != nil {
+ return err
+ }
+ defer resp.Body.Close()
+ buf, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ return err
+ }
+ if resp.StatusCode != 200 {
+ return fmt.Errorf("request failed (%s): %s", req.URL, resp.Status)
+ }
+ return json.Unmarshal(buf, dst)
+}
+
+// RequestAndDecode performs an API request and unmarshals the
+// response (which must be JSON) into dst. Method and body arguments
+// are the same as for http.NewRequest(). The given path is added to
+// the server's scheme/host/port to form the request URL. The given
+// params are passed via POST form or query string.
+//
+// path must not contain a query string.
+func (c *Client) RequestAndDecode(dst interface{}, method, path string, body io.Reader, params interface{}) error {
+ urlString := c.apiURL(path)
+ var urlValues url.Values
+ if v, ok := params.(url.Values); ok {
+ urlValues = v
+ } else if params != nil {
+ // Convert an arbitrary struct to url.Values. For
+ // example, Foo{Bar: []int{1,2,3}, Baz: "waz"} becomes
+ // url.Values{`bar`:`{"a":[1,2,3]}`,`Baz`:`waz`}
+ //
+ // TODO: Do this more efficiently, possibly using
+ // json.Decode/Encode, so the whole thing doesn't have
+ // to get encoded, decoded, and re-encoded.
+ j, err := json.Marshal(params)
+ if err != nil {
+ return err
+ }
+ var generic map[string]interface{}
+ err = json.Unmarshal(j, &generic)
+ if err != nil {
+ return err
+ }
+ urlValues = url.Values{}
+ for k, v := range generic {
+ if v, ok := v.(string); ok {
+ urlValues.Set(k, v)
+ continue
+ }
+ j, err := json.Marshal(v)
+ if err != nil {
+ return err
+ }
+ urlValues.Set(k, string(j))
+ }
+ }
+ if (method == "GET" || body != nil) && urlValues != nil {
+ // FIXME: what if params don't fit in URL
+ u, err := url.Parse(urlString)
+ if err != nil {
+ return err
+ }
+ u.RawQuery = urlValues.Encode()
+ urlString = u.String()
+ }
+ req, err := http.NewRequest(method, urlString, body)
+ if err != nil {
+ return err
+ }
+ return c.DoAndDecode(dst, req)
+}
+
+func (c *Client) httpClient() *http.Client {
+ if c.Client == nil {
+ return http.DefaultClient
+ }
+ return c.Client
+}
+
+func (c *Client) apiURL(path string) string {
+ return "https://" + c.APIHost + "/" + path
+}
+
+type DiscoveryDocument struct {
+ DefaultCollectionReplication int `json:"defaultCollectionReplication"`
+ BlobSignatureTTL int64 `json:"blobSignatureTtl"`
+}
+
+func (c *Client) DiscoveryDocument() (*DiscoveryDocument, error) {
+ var dd DiscoveryDocument
+ return &dd, c.RequestAndDecode(&dd, "GET", "discovery/v1/apis/arvados/v1/rest", nil, nil)
+}
diff --git a/services/keep-balance/collection.go b/services/keep-balance/collection.go
new file mode 100644
index 0000000..db1b0f2
--- /dev/null
+++ b/services/keep-balance/collection.go
@@ -0,0 +1,153 @@
+package main
+
+import (
+ "bufio"
+ "encoding/json"
+ "fmt"
+ "strings"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/manifest"
+)
+
+type Collection struct {
+ UUID string `json:"uuid,omitempty"`
+ ExpiresAt *time.Time `json:"expires_at,omitempty"`
+ ManifestText string `json:"manifest_text,omitempty"`
+ CreatedAt *time.Time `json:"created_at,omitempty"`
+ ModifiedAt *time.Time `json:"modified_at,omitempty"`
+ PortableDataHash string `json:"portable_data_hash,omitempty"`
+ ReplicationConfirmed *int `json:"replication_confirmed,omitempty"`
+ ReplicationConfirmedAt *time.Time `json:"replication_confirmed_at,omitempty"`
+ ReplicationDesired *int `json:"replication_desired,omitempty"`
+}
+
+func (c *Collection) SizedDigests() ([]SizedDigest, error) {
+ if c.ManifestText == "" && c.PortableDataHash != "d41d8cd98f00b204e9800998ecf8427e+0" {
+ // TODO: Check more subtle forms of corruption, too
+ return nil, fmt.Errorf("manifest is missing")
+ }
+ var sds []SizedDigest
+ scanner := bufio.NewScanner(strings.NewReader(c.ManifestText))
+ scanner.Buffer(make([]byte, 1048576), len(c.ManifestText))
+ for scanner.Scan() {
+ line := scanner.Text()
+ tokens := strings.Split(line, " ")
+ if len(tokens) < 3 {
+ return nil, fmt.Errorf("Invalid stream (<3 tokens): %q", line)
+ }
+ for _, token := range tokens[1:] {
+ if !manifest.LocatorPattern.MatchString(token) {
+ // FIXME: ensure it's a file token
+ break
+ }
+ // FIXME: shouldn't assume 32 char hash
+ if i := strings.IndexRune(token[33:], '+'); i >= 0 {
+ token = token[:33+i]
+ }
+ sds = append(sds, SizedDigest(token))
+ }
+ }
+ return sds, scanner.Err()
+}
+
+type CollectionList struct {
+ Items []Collection `json:"items"`
+ ItemsAvailable int `json:"items_available"`
+ Offset int `json:"offset"`
+ Limit int `json:"limit"`
+}
+
+type Filter struct {
+ Attr string
+ Operator string
+ Operand interface{}
+}
+
+func (f *Filter) MarshalJSON() ([]byte, error) {
+ return json.Marshal([]interface{}{f.Attr, f.Operator, f.Operand})
+}
+
+type ResourceListParams struct {
+ Select []string `json:"select,omitempty"`
+ Filters []Filter `json:"filters,omitempty"`
+ Limit *int `json:"limit,omitempty"`
+ Offset int `json:"offset,omitempty"`
+ Order string `json:"order,omitempty"`
+}
+
+// EachCollection calls f once for every readable
+// collection. EachCollection stops if it encounters an error, such as
+// f returning a non-nil error.
+//
+// The progress function is called periodically with done (number of
+// times f has been called) and total (number of times f is expected
+// to be called).
+func (c *Client) EachCollection(f func(Collection) error, progress func(done, total int)) error {
+ if progress == nil {
+ progress = func(_, _ int) {}
+ }
+
+ var expectCount int
+ {
+ var page CollectionList
+ var zero int
+ err := c.RequestAndDecode(&page, "GET", "arvados/v1/collections", nil, ResourceListParams{Limit: &zero})
+ if err != nil {
+ return err
+ }
+ expectCount = page.ItemsAvailable
+ }
+
+ params := ResourceListParams{
+ Order: "modified_at, uuid",
+ Select: []string{"uuid", "manifest_text", "modified_at", "portable_data_hash"},
+ }
+ var last Collection
+ var filterTime time.Time
+ callCount := 0
+ for {
+ progress(callCount, expectCount)
+ var page CollectionList
+ err := c.RequestAndDecode(&page, "GET", "arvados/v1/collections", nil, params)
+ if err != nil {
+ return err
+ }
+ for _, coll := range page.Items {
+ if last.ModifiedAt != nil && *last.ModifiedAt == *coll.ModifiedAt && last.UUID >= coll.UUID {
+ continue
+ }
+ callCount++
+ err = f(coll)
+ if err != nil {
+ return err
+ }
+ last = coll
+ }
+ if last.ModifiedAt != nil && *last.ModifiedAt == filterTime {
+ if page.ItemsAvailable > len(page.Items) {
+ // TODO: use "mtime=X && UUID>Y"
+ // filters to get all collections with
+ // this timestamp, then use "mtime>X"
+ // to get the next timestamp.
+ return fmt.Errorf("BUG: Received an entire page with the same modified_at timestamp (%v), cannot make progress", filterTime)
+ }
+ break
+ }
+ filterTime = *last.ModifiedAt
+ params.Filters = []Filter{{
+ Attr: "modified_at",
+ Operator: ">=",
+ Operand: filterTime,
+ }, {
+ Attr: "uuid",
+ Operator: "!=",
+ Operand: last.UUID,
+ }}
+ }
+ progress(callCount, expectCount)
+ if callCount < expectCount {
+ return fmt.Errorf("Retrieved only %d collections, expected %d", callCount, expectCount)
+ }
+ return nil
+}
diff --git a/services/keep-balance/example-config.json b/services/keep-balance/example-config.json
new file mode 100644
index 0000000..634ec84
--- /dev/null
+++ b/services/keep-balance/example-config.json
@@ -0,0 +1,7 @@
+{
+ "Client": {
+ "APIHost": "4xphq.arvadosapi.com:443",
+ "AuthToken": "zzzzzz",
+ "Insecure": false
+ }
+}
diff --git a/services/keep-balance/keep_service.go b/services/keep-balance/keep_service.go
new file mode 100644
index 0000000..b63fbae
--- /dev/null
+++ b/services/keep-balance/keep_service.go
@@ -0,0 +1,14 @@
+package main
+
+import (
+ "fmt"
+)
+
+type KeepService struct {
+ KeepServiceModel
+ ChangeSet
+}
+
+func (srv KeepService) String() string {
+ return fmt.Sprintf("%s (%s:%d)", srv.UUID, srv.ServiceHost, srv.ServicePort)
+}
diff --git a/services/keep-balance/keep_service_model.go b/services/keep-balance/keep_service_model.go
new file mode 100644
index 0000000..40d638d
--- /dev/null
+++ b/services/keep-balance/keep_service_model.go
@@ -0,0 +1,123 @@
+package main
+
+// TODO: move this to the SDK, and remove the word Model from names.
+
+import (
+ "fmt"
+ "bufio"
+ "net/http"
+ "strings"
+ "strconv"
+)
+
+type KeepServiceModel struct {
+ UUID string `json:"uuid"`
+ ServiceHost string `json:"service_host"`
+ ServicePort int `json:"service_port"`
+ ServiceSSLFlag bool `json:"service_ssl_flag"`
+ ServiceType string `json:"service_type"`
+ ReadOnly bool `json:"read_only"`
+}
+
+type KeepServiceModelList struct {
+ Items []KeepServiceModel `json:"items"`
+ ItemsAvailable int `json:"items_available"`
+ Offset int `json:"offset"`
+ Limit int `json:"limit"`
+}
+
+// IndexEntry is what keepstore's index response tells us about a
+// stored block.
+type IndexEntry struct {
+ SizedDigest
+ Mtime int64
+}
+
+// EachKeepServiceModel calls f once for every readable
+// KeepServiceModel. EachKeepServiceModel stops if it encounters an
+// error, such as f returning a non-nil error.
+func (c *Client) EachKeepServiceModel(f func(KeepServiceModel) error) error {
+ params := ResourceListParams{}
+ for {
+ var page KeepServiceModelList
+ err := c.RequestAndDecode(&page, "GET", "arvados/v1/keep_services", nil, params)
+ if err != nil {
+ return err
+ }
+ for _, item := range page.Items {
+ err = f(item)
+ if err != nil {
+ return err
+ }
+ }
+ params.Offset = params.Offset + len(page.Items)
+ if params.Offset >= page.ItemsAvailable {
+ return nil
+ }
+ }
+}
+
+func (s *KeepServiceModel) url(path string) string {
+ var f string
+ if s.ServiceSSLFlag {
+ f = "https://%s:%d/%s"
+ } else {
+ f = "http://%s:%d/%s"
+ }
+ return fmt.Sprintf(f, s.ServiceHost, s.ServicePort, path)
+}
+
+// String implements fmt.Stringer
+func (s *KeepServiceModel) String() string {
+ return s.UUID
+}
+
+// Index returns an unsorted list of blocks that can be retrieved from
+// this server.
+func (s *KeepServiceModel) Index(c *Client, prefix string) ([]IndexEntry, error) {
+ url := s.url("index/" + prefix)
+ req, err := http.NewRequest("GET", url, nil)
+ if err != nil {
+ return nil, fmt.Errorf("NewRequest(%v): %v", url, err)
+ }
+ resp, err := c.Do(req)
+ if err != nil {
+ return nil, fmt.Errorf("Do(%v): %v", url, err)
+ } else if resp.StatusCode != 200 {
+ return nil, fmt.Errorf("%v: %v", url, resp.Status)
+ }
+ defer resp.Body.Close()
+
+ var entries []IndexEntry
+ scanner := bufio.NewScanner(resp.Body)
+ sawEOF := false
+ for scanner.Scan() {
+ if sawEOF {
+ return nil, fmt.Errorf("Index response contained non-terminal blank line")
+ }
+ line := scanner.Text()
+ if line == "" {
+ sawEOF = true
+ continue
+ }
+ fields := strings.Split(line, " ")
+ if len(fields) != 2 {
+ return nil, fmt.Errorf("Malformed index line %q: %d fields", line, len(fields))
+ }
+ mtime, err := strconv.ParseInt(fields[1], 10, 64)
+ if err != nil {
+ return nil, fmt.Errorf("Malformed index line %q: mtime: %v", line, err)
+ }
+ entries = append(entries, IndexEntry{
+ SizedDigest: SizedDigest(fields[0]),
+ Mtime: mtime,
+ })
+ }
+ if err := scanner.Err(); err != nil {
+ return nil, fmt.Errorf("Error scanning index response: %v", err)
+ }
+ if !sawEOF {
+ return nil, fmt.Errorf("Index response had no EOF marker")
+ }
+ return entries, nil
+}
diff --git a/services/keep-balance/main.go b/services/keep-balance/main.go
new file mode 100644
index 0000000..fe7f554
--- /dev/null
+++ b/services/keep-balance/main.go
@@ -0,0 +1,59 @@
+package main
+
+import (
+ "encoding/json"
+ "flag"
+ "log"
+ "os"
+ "io/ioutil"
+)
+
+type Config struct {
+ Client Client
+ Apply bool
+}
+
+var debugf func(string, ...interface{}) = func(string, ...interface{}) {}
+
+func main() {
+ var config Config
+
+ configPath := flag.String("config",
+ os.Getenv("HOME")+"/.config/arvados/keep-balance.json",
+ "configuration file")
+ debugFlag := flag.Bool("debug", false, "enable debug messages")
+ flag.Parse()
+
+ if buf, err := ioutil.ReadFile(*configPath); err != nil {
+ log.Fatalf("Reading %q: %v", *configPath, err)
+ } else if err = json.Unmarshal(buf, &config); err != nil {
+ log.Fatalf("Decoding %q: %v", *configPath, err)
+ }
+
+ if *debugFlag {
+ debugf = log.Printf
+ if j, err := json.Marshal(config); err != nil {
+ log.Fatal(err)
+ } else {
+ log.Printf("config is %s", j)
+ }
+ }
+ if err := Run(config); err != nil {
+ log.Fatal(err)
+ }
+}
+
+func Run(config Config) error {
+ var bal Balancer
+ bal.Logger = log.New(os.Stderr, "", log.LstdFlags)
+ err := bal.GetCurrentState(&config.Client)
+ if err != nil {
+ return err
+ }
+ bal.ComputeChangeSets()
+ bal.PrintStatistics()
+ if config.Apply {
+ err = bal.ApplyChangeSets(&config.Client)
+ }
+ return err
+}
diff --git a/services/keep-balance/time_me.go b/services/keep-balance/time_me.go
new file mode 100644
index 0000000..549ccd7
--- /dev/null
+++ b/services/keep-balance/time_me.go
@@ -0,0 +1,13 @@
+package main
+
+import (
+ "fmt"
+ "time"
+)
+
+func timeMe() func(string) {
+ t0 := time.Now()
+ return func(label string) {
+ fmt.Printf("%s: %v\n", label, time.Since(t0))
+ }
+}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list