[ARVADOS] updated: d86afd81c109b73abc4511c65a9584084374395d

Git user git at public.curoverse.com
Sat May 14 16:13:11 EDT 2016


Summary of changes:
 services/keep-balance/balance.go    | 56 ++++++++++++++++++++++++-------------
 services/keep-balance/collection.go |  2 ++
 2 files changed, 38 insertions(+), 20 deletions(-)

  discards  6be020502307375cb59e782f9fd2437b7772c692 (commit)
       via  d86afd81c109b73abc4511c65a9584084374395d (commit)

This update added new revisions after undoing existing revisions.  That is
to say, the old revision is not a strict subset of the new revision.  This
situation occurs when you --force push a change and generate a repository
containing something like this:

 * -- * -- B -- O -- O -- O (6be020502307375cb59e782f9fd2437b7772c692)
            \
             N -- N -- N (d86afd81c109b73abc4511c65a9584084374395d)

When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit d86afd81c109b73abc4511c65a9584084374395d
Author: Tom Clegg <tom at curoverse.com>
Date:   Sat May 14 16:13:00 2016 -0400

    9162: Add keep-balance

diff --git a/build/run-tests.sh b/build/run-tests.sh
index d229341..66e38b7 100755
--- a/build/run-tests.sh
+++ b/build/run-tests.sh
@@ -70,6 +70,7 @@ services/fuse
 services/keep-web
 services/keepproxy
 services/keepstore
+services/keep-balance
 services/login-sync
 services/nodemanager
 services/crunch-run
@@ -714,6 +715,7 @@ gostuff=(
     services/keep-web
     services/keepstore
     sdk/go/keepclient
+    services/keep-balance
     services/keepproxy
     services/datamanager/summary
     services/datamanager/collection
diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go
new file mode 100644
index 0000000..f9b8f34
--- /dev/null
+++ b/services/keep-balance/balance.go
@@ -0,0 +1,332 @@
+package main
+
+import (
+	"fmt"
+	"log"
+	"runtime"
+	"sync"
+	"time"
+
+	"git.curoverse.com/arvados.git/sdk/go/keepclient"
+)
+
+type Balancer struct {
+	*BlockStateMap
+	KeepServices       map[string]*KeepService
+	DefaultReplication int
+	serviceRoots       map[string]string
+	Logger             *log.Logger
+	MinMtime           int64
+	errors             []error
+	mutex sync.Mutex
+}
+
+func (bal *Balancer) addCollection(coll Collection) error {
+	blkids, err := coll.SizedDigests()
+	if err != nil {
+		bal.mutex.Lock()
+		bal.errors = append(bal.errors, fmt.Errorf("%v: %v", coll.UUID, err))
+		bal.mutex.Unlock()
+		return nil
+	}
+	repl := bal.DefaultReplication
+	if coll.ReplicationDesired != nil {
+		repl = *coll.ReplicationDesired
+	}
+	debugf("%v: %d block x%d", coll.UUID, len(blkids), repl)
+	bal.BlockStateMap.IncreaseDesired(repl, blkids)
+	return nil
+}
+
+func (bal *Balancer) addKeepService(srv KeepServiceModel) error {
+	switch srv.ServiceType {
+	case "disk", "azure":
+		bal.KeepServices[srv.UUID] = &KeepService{
+			KeepServiceModel: srv,
+		}
+		return nil
+	case "proxy":
+		return nil
+	default:
+		return fmt.Errorf("%v has unexpected service type %q", srv.UUID, srv.ServiceType)
+	}
+}
+
+func (bal *Balancer) Logf(f string, args ...interface{}) {
+	if bal.Logger != nil {
+		bal.Logger.Printf(f, args...)
+	}
+}
+
+func (bal *Balancer) CheckSanity() error {
+	// TODO: current user is admin
+	// TODO: number of collections > 0
+	// TODO: some blocks have desired > 0
+	// TODO: no two services have identical indexes
+	// TODO: no collisions (same md5, different size)
+	return nil
+}
+
+// GetCurrentState determines the current replication state, and the
+// desired replication level, for every block that is either
+// retrievable or referenced.
+//
+// It determines the current replication state by reading the block index
+// from every known Keep service.
+//
+// It determines the desired replication level by retrieving all
+// collection manifests in the database (API server).
+//
+// It encodes the resulting information in BlockStateMap.
+func (bal *Balancer) GetCurrentState(c *Client) error {
+	defer timeMe()("GetCurrentState")
+	bal.BlockStateMap = NewBlockStateMap()
+	bal.KeepServices = make(map[string]*KeepService)
+
+	dd, err := c.DiscoveryDocument()
+	if err != nil {
+		return err
+	}
+	bal.DefaultReplication = dd.DefaultCollectionReplication
+	bal.MinMtime = time.Now().Unix() - dd.BlobSignatureTTL
+
+	errs := make(chan error)
+	wg := sync.WaitGroup{}
+
+	if err := c.EachKeepServiceModel(bal.addKeepService); err != nil {
+		return err
+	}
+	for _, srv := range bal.KeepServices {
+		wg.Add(1)
+		go func(srv *KeepService) {
+			defer wg.Done()
+			bal.Logf("%s: retrieve index", srv)
+			idx, err := srv.Index(c, "")
+			if err != nil {
+				errs <- fmt.Errorf("%s: %v", srv, err)
+				return
+			}
+			bal.Logf("%s: add replicas to map", srv)
+			bal.BlockStateMap.AddReplicas(srv, idx)
+			bal.Logf("%s: done", srv)
+		}(srv)
+	}
+
+	collQ := make(chan Collection, 200)
+
+	wg.Add(1)
+	go func() {
+		defer wg.Done()
+		for coll := range collQ {
+			err := bal.addCollection(coll)
+			if err != nil {
+				errs <- err
+				for _ = range collQ {
+				}
+				return
+			}
+		}
+	}()
+
+	wg.Add(1)
+	go func() {
+		defer wg.Done()
+		err = c.EachCollection(
+			func(coll Collection) error {
+				collQ <- coll
+				if len(errs) > 0 {
+					// some other GetCurrentState
+					// error happened: no point
+					// getting any more
+					// collections.
+					return fmt.Errorf("")
+				}
+				return nil
+			}, func(done, total int) {
+				bal.Logf("collections: %d/%d", done, total)
+			})
+		close(collQ)
+		if err != nil {
+			errs <- err
+		}
+	}()
+
+	go func() {
+		wg.Wait()
+		close(errs)
+	}()
+	return <-errs
+}
+
+// ComputeChangeSets compares, for each known block, the current and
+// desired replication states. If it is possible to get closer to the
+// desired state by copying or deleting blocks, it submits those
+// changes to the relevant KeepServices' ChangeSets.
+func (bal *Balancer) ComputeChangeSets() {
+	defer timeMe()("ComputeChangeSets")
+	bal.serviceRoots = make(map[string]string)
+	for _, srv := range bal.KeepServices {
+		bal.serviceRoots[srv.UUID] = srv.UUID
+	}
+
+	type balanceTask struct {
+		blkid SizedDigest
+		blk   *BlockState
+	}
+	todo := make(chan balanceTask, 16)
+	var wg sync.WaitGroup
+	for i := 0; i < runtime.NumCPU(); i++ {
+		wg.Add(1)
+		go func() {
+			for work := range todo {
+				bal.balanceBlock(work.blkid, work.blk)
+			}
+			wg.Done()
+		}()
+	}
+	bal.BlockStateMap.Apply(func(blkid SizedDigest, blk *BlockState) {
+		todo <- balanceTask{
+			blkid: blkid,
+			blk:   blk,
+		}
+	})
+	close(todo)
+	wg.Wait()
+}
+
+// balanceBlock compares current state to desired state for a single
+// block, and makes the appropriate ChangeSet calls.
+func (bal *Balancer) balanceBlock(blkid SizedDigest, blk *BlockState) {
+	debugf("balanceBlock: %v %+v", blkid, blk)
+	uuids := keepclient.NewRootSorter(bal.serviceRoots, string(blkid[:32])).GetSortedRoots()
+	hasRepl := make(map[string]Replica, len(bal.serviceRoots))
+	for _, repl := range blk.Replicas {
+		hasRepl[repl.UUID] = repl
+	}
+	// To be safe we assume two replicas with the same Mtime are
+	// in fact the same replica being reported more than
+	// once. len(uniqueBestRepl) is the number of distinct
+	// replicas in the best rendezvous positions we've considered
+	// so far.
+	uniqueBestRepl := make(map[int64]bool, len(bal.serviceRoots))
+	// pulls is the number of Pull changes we have already
+	// requested. (For purposes of deciding whether to Pull to
+	// rendezvous position N, we should assume all pulls we have
+	// requested on rendezvous positions M<N will be successful.)
+	pulls := 0
+	for _, uuid := range uuids {
+		srv := bal.KeepServices[uuid]
+		// TODO: request a Touch if Mtime is duplicated.
+		if repl, ok := hasRepl[srv.UUID]; ok {
+			// This service has a replica. We should
+			// delete it if [1] we already have enough
+			// distinct replicas in better rendezvous
+			// positions and [2] this replica's Mtime is
+			// distinct from the better replicas' Mtime.
+			if len(uniqueBestRepl) >= blk.Desired &&
+				!uniqueBestRepl[repl.Mtime] {
+				// FIXME: check Mtime against discovery doc TTL
+				srv.AddTrash(Trash{
+					SizedDigest: blkid,
+					Mtime:       repl.Mtime,
+				})
+			}
+			uniqueBestRepl[repl.Mtime] = true
+		} else if pulls+len(uniqueBestRepl) < blk.Desired &&
+			len(blk.Replicas) > 0 &&
+			!srv.ReadOnly {
+			// This service doesn't have a replica. We
+			// should pull one to this server if we don't
+			// already have enough (existing+requested)
+			// replicas in better rendezvous positions.
+			srv.AddPull(Pull{
+				SizedDigest: blkid,
+				Source:      blk.Replicas[0].KeepService,
+			})
+			pulls++
+		}
+	}
+}
+
+type blocksNBytes struct {
+	replicas int
+	blocks   int
+	bytes    int64
+}
+
+func (bb blocksNBytes) String() string {
+	return fmt.Sprintf("%d replicas (%d blocks, %d bytes)", bb.replicas, bb.blocks, bb.bytes)
+}
+
+func (bal *Balancer) PrintStatistics() {
+	var lost, overrep, unref, garbage, underrep, justright, desired, current blocksNBytes
+	bal.BlockStateMap.Apply(func(blkid SizedDigest, blk *BlockState) {
+		surplus := len(blk.Replicas) - blk.Desired
+		bytes := blkid.Size()
+		if len(blk.Replicas) == 0 && blk.Desired > 0 {
+			lost.replicas -= surplus
+			lost.blocks++
+			lost.bytes += bytes * int64(-surplus)
+		} else if len(blk.Replicas) < blk.Desired {
+			underrep.replicas -= surplus
+			underrep.blocks++
+			underrep.bytes += bytes * int64(-surplus)
+		} else if len(blk.Replicas) > 0 && blk.Desired == 0 {
+			counter := &garbage
+			for _, r := range blk.Replicas {
+				if r.Mtime >= bal.MinMtime {
+					counter = &unref
+					break
+				}
+			}
+			counter.replicas += surplus
+			counter.blocks++
+			counter.bytes += bytes * int64(surplus)
+		} else if len(blk.Replicas) > blk.Desired {
+			overrep.replicas += surplus
+			overrep.blocks++
+			overrep.bytes += bytes * int64(len(blk.Replicas)-blk.Desired)
+		} else {
+			justright.replicas += blk.Desired
+			justright.blocks++
+			justright.bytes += bytes * int64(blk.Desired)
+		}
+
+		if blk.Desired > 0 {
+			desired.replicas += blk.Desired
+			desired.blocks++
+			desired.bytes += bytes * int64(blk.Desired)
+		}
+		current.replicas += len(blk.Replicas)
+		current.blocks++
+		current.bytes += bytes * int64(len(blk.Replicas))
+	})
+	fmt.Println("===")
+	fmt.Println(lost, "lost (0=have<want)")
+	fmt.Println(underrep, "underreplicated (0<have<want)")
+	fmt.Println(overrep, "overreplicated (have>want>0)")
+	fmt.Println(garbage, "garbage (have>want=0, old)")
+	fmt.Println(unref, "unreferenced (have>want=0, new)")
+	fmt.Println(justright, "just right (have=want)")
+	fmt.Println("===")
+	fmt.Println(desired, "total commitment (excluding unreferenced)")
+	fmt.Println(current, "total usage")
+	fmt.Println("===")
+	for _, srv := range bal.KeepServices {
+		fmt.Printf("%s: %v\n", srv, srv.ChangeSet)
+	}
+	fmt.Println("===")
+}
+
+func (bal *Balancer) ApplyChangeSets(c *Client) error {
+	defer timeMe()("ApplyChangeSets")
+
+	if len(bal.errors) > 0 {
+		for _, err := range bal.errors {
+			log.Println("deferred error:", err)
+		}
+		return fmt.Errorf("refusing to apply changesets after errors")
+	}
+	// FIXME: do stuff
+	return nil
+}
diff --git a/services/keep-balance/block_state.go b/services/keep-balance/block_state.go
new file mode 100644
index 0000000..c087ecd
--- /dev/null
+++ b/services/keep-balance/block_state.go
@@ -0,0 +1,95 @@
+package main
+
+import (
+	"strconv"
+	"strings"
+	"sync"
+)
+
+// SizedDigest is hash+size
+type SizedDigest string
+
+func (sd SizedDigest) Size() int64 {
+	n, _ := strconv.ParseInt(strings.Split(string(sd), "+")[1], 10, 64)
+	return n
+}
+
+type Replica struct {
+	*KeepService
+	Mtime int64
+}
+
+type BlockState struct {
+	Replicas []Replica
+	Desired  int
+}
+
+func (bs *BlockState) AddReplica(r Replica) {
+	bs.Replicas = append(bs.Replicas, r)
+}
+
+func (bs *BlockState) IncreaseDesired(n int) {
+	if bs.Desired < n {
+		bs.Desired = n
+	}
+}
+
+// BlockStateMap is a goroutine-safe wrapper around a
+// map[SizedDigest]*BlockState.
+type BlockStateMap struct {
+	entries map[SizedDigest]*BlockState
+	mutex  sync.Mutex
+}
+
+func NewBlockStateMap() *BlockStateMap {
+	return &BlockStateMap{
+		entries: make(map[SizedDigest]*BlockState),
+	}
+}
+
+// Get returns a BlockState entry, allocating a new one if needed.
+func (bsm *BlockStateMap) Get(blkid SizedDigest) *BlockState {
+	// TODO? Allocate BlockState structs a slice at a time,
+	// instead of one at a time.
+	blk := bsm.entries[blkid]
+	if blk == nil {
+		blk = &BlockState{}
+		bsm.entries[blkid] = blk
+	}
+	return blk
+}
+
+// Apply runs f on each entry in the map.
+func (bsm *BlockStateMap) Apply(f func(SizedDigest, *BlockState)) {
+	bsm.mutex.Lock()
+	defer bsm.mutex.Unlock()
+
+	for blkid, blk := range bsm.entries {
+		f(blkid, blk)
+	}
+}
+
+// AddReplicas updates the map to indicate srv has a replica of each
+// block in idx.
+func (bsm *BlockStateMap) AddReplicas(srv *KeepService, idx []IndexEntry) {
+	bsm.mutex.Lock()
+	defer bsm.mutex.Unlock()
+
+	for _, ent := range idx {
+		bsm.Get(ent.SizedDigest).AddReplica(Replica{
+			KeepService: srv,
+			Mtime:       ent.Mtime,
+		})
+	}
+}
+
+// IncreaseDesired updates the map to indicate the desired replication
+// for the given blocks is at least n.
+func (bsm *BlockStateMap) IncreaseDesired(n int, blocks []SizedDigest) {
+	bsm.mutex.Lock()
+	defer bsm.mutex.Unlock()
+
+	for _, blkid := range blocks {
+		bsm.Get(blkid).IncreaseDesired(n)
+	}
+}
diff --git a/services/keep-balance/change_set.go b/services/keep-balance/change_set.go
new file mode 100644
index 0000000..2090e81
--- /dev/null
+++ b/services/keep-balance/change_set.go
@@ -0,0 +1,48 @@
+package main
+
+import (
+	"fmt"
+	"sync"
+)
+
+// Pull is a request to retrieve a block from a remote server, and
+// store it locally.
+type Pull struct {
+	SizedDigest
+	Source *KeepService
+}
+
+// Trash is a request to delete a block.
+type Trash struct {
+	SizedDigest
+	Mtime int64
+}
+
+// ChangeSet is a set of change requests that will be sent to a
+// keepstore server.
+type ChangeSet struct {
+	Pulls   []Pull
+	Trashes []Trash
+	mutex   sync.Mutex
+}
+
+// AddPull adds a Pull operation.
+func (cs *ChangeSet) AddPull(p Pull) {
+	cs.mutex.Lock()
+	cs.Pulls = append(cs.Pulls, p)
+	cs.mutex.Unlock()
+}
+
+// AddTrash adds a Trash operation
+func (cs *ChangeSet) AddTrash(t Trash) {
+	cs.mutex.Lock()
+	cs.Trashes = append(cs.Trashes, t)
+	cs.mutex.Unlock()
+}
+
+// String implements fmt.Stringer.
+func (cs ChangeSet) String() string {
+	cs.mutex.Lock()
+	defer cs.mutex.Unlock()
+	return fmt.Sprintf("ChangeSet{Pulls=%d, Trashes=%d}", len(cs.Pulls), len(cs.Trashes))
+}
diff --git a/services/keep-balance/client.go b/services/keep-balance/client.go
new file mode 100644
index 0000000..86d95a2
--- /dev/null
+++ b/services/keep-balance/client.go
@@ -0,0 +1,133 @@
+package main
+
+import (
+	"encoding/json"
+	"fmt"
+	"io"
+	"io/ioutil"
+	"net/http"
+	"net/url"
+	"os"
+)
+
+type Client struct {
+	*http.Client
+	APIHost   string
+	AuthToken string
+	Insecure  bool
+}
+
+func NewClientFromEnv() *Client {
+	return &Client{
+		APIHost:   os.Getenv("ARVADOS_API_HOST"),
+		AuthToken: os.Getenv("ARVADOS_API_TOKEN"),
+		Insecure:  os.Getenv("ARVADOS_API_HOST_INSECURE") != "",
+	}
+}
+
+// Do adds authentication headers and then calls (*http.Client)Do().
+func (c *Client) Do(req *http.Request) (*http.Response, error) {
+	if c.AuthToken != "" {
+		req.Header.Add("Authorization", "OAuth2 "+c.AuthToken)
+	}
+	debugf("%#v", req)
+	return c.httpClient().Do(req)
+}
+
+// DoAndDecode performs req and unmarshals the response (which must be
+// JSON) into dst. Use this instead of RequestAndDecode if you need
+// more control of the http.Request object.
+func (c *Client) DoAndDecode(dst interface{}, req *http.Request) error {
+	resp, err := c.Do(req)
+	if err != nil {
+		return err
+	}
+	defer resp.Body.Close()
+	buf, err := ioutil.ReadAll(resp.Body)
+	if err != nil {
+		return err
+	}
+	if resp.StatusCode != 200 {
+		return fmt.Errorf("request failed (%s): %s", req.URL, resp.Status)
+	}
+	return json.Unmarshal(buf, dst)
+}
+
+// RequestAndDecode performs an API request and unmarshals the
+// response (which must be JSON) into dst. Method and body arguments
+// are the same as for http.NewRequest(). The given path is added to
+// the server's scheme/host/port to form the request URL. The given
+// params are passed via POST form or query string.
+//
+// path must not contain a query string.
+func (c *Client) RequestAndDecode(dst interface{}, method, path string, body io.Reader, params interface{}) error {
+	urlString := c.apiURL(path)
+	var urlValues url.Values
+	if v, ok := params.(url.Values); ok {
+		urlValues = v
+	} else if params != nil {
+		// Convert an arbitrary struct to url.Values. For
+		// example, Foo{Bar: []int{1,2,3}, Baz: "waz"} becomes
+		// url.Values{`bar`:`{"a":[1,2,3]}`,`Baz`:`waz`}
+		//
+		// TODO: Do this more efficiently, possibly using
+		// json.Decode/Encode, so the whole thing doesn't have
+		// to get encoded, decoded, and re-encoded.
+		j, err := json.Marshal(params)
+		if err != nil {
+			return err
+		}
+		var generic map[string]interface{}
+		err = json.Unmarshal(j, &generic)
+		if err != nil {
+			return err
+		}
+		urlValues = url.Values{}
+		for k, v := range generic {
+			if v, ok := v.(string); ok {
+				urlValues.Set(k, v)
+				continue
+			}
+			j, err := json.Marshal(v)
+			if err != nil {
+				return err
+			}
+			urlValues.Set(k, string(j))
+		}
+	}
+	if (method == "GET" || body != nil) && urlValues != nil {
+		// FIXME: what if params don't fit in URL
+		u, err := url.Parse(urlString)
+		if err != nil {
+			return err
+		}
+		u.RawQuery = urlValues.Encode()
+		urlString = u.String()
+	}
+	req, err := http.NewRequest(method, urlString, body)
+	if err != nil {
+		return err
+	}
+	return c.DoAndDecode(dst, req)
+}
+
+func (c *Client) httpClient() *http.Client {
+	if c.Client == nil {
+		return http.DefaultClient
+	}
+	return c.Client
+}
+
+func (c *Client) apiURL(path string) string {
+	return "https://" + c.APIHost + "/" + path
+}
+
+type DiscoveryDocument struct {
+	DefaultCollectionReplication int   `json:"defaultCollectionReplication"`
+	BlobSignatureTTL             int64 `json:"blobSignatureTtl"`
+}
+
+func (c *Client) DiscoveryDocument() (*DiscoveryDocument, error) {
+	var dd DiscoveryDocument
+	return &dd, c.RequestAndDecode(&dd, "GET", "discovery/v1/apis/arvados/v1/rest", nil, nil)
+}
diff --git a/services/keep-balance/collection.go b/services/keep-balance/collection.go
new file mode 100644
index 0000000..801411c
--- /dev/null
+++ b/services/keep-balance/collection.go
@@ -0,0 +1,155 @@
+package main
+
+import (
+	"bufio"
+	"encoding/json"
+	"fmt"
+	"strings"
+	"time"
+
+	"git.curoverse.com/arvados.git/sdk/go/manifest"
+)
+
+type Collection struct {
+	UUID                   string     `json:"uuid,omitempty"`
+	ExpiresAt              *time.Time `json:"expires_at,omitempty"`
+	ManifestText           string     `json:"manifest_text,omitempty"`
+	CreatedAt              *time.Time `json:"created_at,omitempty"`
+	ModifiedAt             *time.Time `json:"modified_at,omitempty"`
+	PortableDataHash       string     `json:"portable_data_hash,omitempty"`
+	ReplicationConfirmed   *int       `json:"replication_confirmed,omitempty"`
+	ReplicationConfirmedAt *time.Time `json:"replication_confirmed_at,omitempty"`
+	ReplicationDesired     *int       `json:"replication_desired,omitempty"`
+}
+
+func (c *Collection) SizedDigests() ([]SizedDigest, error) {
+	if c.ManifestText == "" && c.PortableDataHash != "d41d8cd98f00b204e9800998ecf8427e+0" {
+		// TODO: Check more subtle forms of corruption, too
+		return nil, fmt.Errorf("manifest is missing")
+	}
+	var sds []SizedDigest
+	scanner := bufio.NewScanner(strings.NewReader(c.ManifestText))
+	scanner.Buffer(make([]byte, 1048576), len(c.ManifestText))
+	for scanner.Scan() {
+		line := scanner.Text()
+		tokens := strings.Split(line, " ")
+		if len(tokens) < 3 {
+			return nil, fmt.Errorf("Invalid stream (<3 tokens): %q", line)
+		}
+		for _, token := range tokens[1:] {
+			if !manifest.LocatorPattern.MatchString(token) {
+				// FIXME: ensure it's a file token
+				break
+			}
+			// FIXME: shouldn't assume 32 char hash
+			if i := strings.IndexRune(token[33:], '+'); i >= 0 {
+				token = token[:33+i]
+			}
+			sds = append(sds, SizedDigest(token))
+		}
+	}
+	return sds, scanner.Err()
+}
+
+type CollectionList struct {
+	Items          []Collection `json:"items"`
+	ItemsAvailable int          `json:"items_available"`
+	Offset         int          `json:"offset"`
+	Limit          int          `json:"limit"`
+}
+
+type Filter struct {
+	Attr     string
+	Operator string
+	Operand  interface{}
+}
+
+func (f *Filter) MarshalJSON() ([]byte, error) {
+	return json.Marshal([]interface{}{f.Attr, f.Operator, f.Operand})
+}
+
+type ResourceListParams struct {
+	Select  []string `json:"select,omitempty"`
+	Filters []Filter `json:"filters,omitempty"`
+	Limit   *int     `json:"limit,omitempty"`
+	Offset  int      `json:"offset,omitempty"`
+	Order   string   `json:"order,omitempty"`
+}
+
+// EachCollection calls f once for every readable
+// collection. EachCollection stops if it encounters an error, such as
+// f returning a non-nil error.
+//
+// The progress function is called periodically with done (number of
+// times f has been called) and total (number of times f is expected
+// to be called).
+func (c *Client) EachCollection(f func(Collection) error, progress func(done, total int)) error {
+	if progress == nil {
+		progress = func(_, _ int) {}
+	}
+
+	var expectCount int
+	{
+		var page CollectionList
+		var zero int
+		err := c.RequestAndDecode(&page, "GET", "arvados/v1/collections", nil, ResourceListParams{Limit: &zero})
+		if err != nil {
+			return err
+		}
+		expectCount = page.ItemsAvailable
+	}
+
+	limit := 1000
+	params := ResourceListParams{
+		Limit:  &limit,
+		Order:  "modified_at, uuid",
+		Select: []string{"uuid", "manifest_text", "modified_at", "portable_data_hash"},
+	}
+	var last Collection
+	var filterTime time.Time
+	callCount := 0
+	for {
+		progress(callCount, expectCount)
+		var page CollectionList
+		err := c.RequestAndDecode(&page, "GET", "arvados/v1/collections", nil, params)
+		if err != nil {
+			return err
+		}
+		for _, coll := range page.Items {
+			if last.ModifiedAt != nil && *last.ModifiedAt == *coll.ModifiedAt && last.UUID >= coll.UUID {
+				continue
+			}
+			callCount++
+			err = f(coll)
+			if err != nil {
+				return err
+			}
+			last = coll
+		}
+		if last.ModifiedAt != nil && *last.ModifiedAt == filterTime {
+			if page.ItemsAvailable > len(page.Items) {
+				// TODO: use "mtime=X && UUID>Y"
+				// filters to get all collections with
+				// this timestamp, then use "mtime>X"
+				// to get the next timestamp.
+				return fmt.Errorf("BUG: Received an entire page with the same modified_at timestamp (%v), cannot make progress", filterTime)
+			}
+			break
+		}
+		filterTime = *last.ModifiedAt
+		params.Filters = []Filter{{
+			Attr:     "modified_at",
+			Operator: ">=",
+			Operand:  filterTime,
+		}, {
+			Attr:     "uuid",
+			Operator: "!=",
+			Operand:  last.UUID,
+		}}
+	}
+	progress(callCount, expectCount)
+	if callCount < expectCount {
+		return fmt.Errorf("Retrieved only %d collections, expected %d", callCount, expectCount)
+	}
+	return nil
+}
diff --git a/services/keep-balance/example-config.json b/services/keep-balance/example-config.json
new file mode 100644
index 0000000..634ec84
--- /dev/null
+++ b/services/keep-balance/example-config.json
@@ -0,0 +1,7 @@
+{
+    "Client": {
+        "APIHost": "4xphq.arvadosapi.com:443",
+        "AuthToken": "zzzzzz",
+        "Insecure": false
+    }
+}
diff --git a/services/keep-balance/keep_service.go b/services/keep-balance/keep_service.go
new file mode 100644
index 0000000..b63fbae
--- /dev/null
+++ b/services/keep-balance/keep_service.go
@@ -0,0 +1,14 @@
+package main
+
+import (
+	"fmt"
+)
+
+type KeepService struct {
+	KeepServiceModel
+	ChangeSet
+}
+
+func (srv KeepService) String() string {
+	return fmt.Sprintf("%s (%s:%d)", srv.UUID, srv.ServiceHost, srv.ServicePort)
+}
diff --git a/services/keep-balance/keep_service_model.go b/services/keep-balance/keep_service_model.go
new file mode 100644
index 0000000..40d638d
--- /dev/null
+++ b/services/keep-balance/keep_service_model.go
@@ -0,0 +1,123 @@
+package main
+
+// TODO: move this to the SDK, and remove the word Model from names.
+
+import (
+	"fmt"
+	"bufio"
+	"net/http"
+	"strings"
+	"strconv"
+)
+
+type KeepServiceModel struct {
+	UUID           string `json:"uuid"`
+	ServiceHost    string `json:"service_host"`
+	ServicePort    int    `json:"service_port"`
+	ServiceSSLFlag bool   `json:"service_ssl_flag"`
+	ServiceType    string `json:"service_type"`
+	ReadOnly       bool   `json:"read_only"`
+}
+
+type KeepServiceModelList struct {
+	Items          []KeepServiceModel `json:"items"`
+	ItemsAvailable int                `json:"items_available"`
+	Offset         int                `json:"offset"`
+	Limit          int                `json:"limit"`
+}
+
+// IndexEntry is what keepstore's index response tells us about a
+// stored block.
+type IndexEntry struct {
+	SizedDigest
+	Mtime int64
+}
+
+// EachKeepServiceModel calls f once for every readable
+// KeepServiceModel. EachKeepServiceModel stops if it encounters an
+// error, such as f returning a non-nil error.
+func (c *Client) EachKeepServiceModel(f func(KeepServiceModel) error) error {
+	params := ResourceListParams{}
+	for {
+		var page KeepServiceModelList
+		err := c.RequestAndDecode(&page, "GET", "arvados/v1/keep_services", nil, params)
+		if err != nil {
+			return err
+		}
+		for _, item := range page.Items {
+			err = f(item)
+			if err != nil {
+				return err
+			}
+		}
+		params.Offset = params.Offset + len(page.Items)
+		if params.Offset >= page.ItemsAvailable {
+			return nil
+		}
+	}
+}
+
+func (s *KeepServiceModel) url(path string) string {
+	var f string
+	if s.ServiceSSLFlag {
+		f = "https://%s:%d/%s"
+	} else {
+		f = "http://%s:%d/%s"
+	}
+	return fmt.Sprintf(f, s.ServiceHost, s.ServicePort, path)
+}
+
+// String implements fmt.Stringer
+func (s *KeepServiceModel) String() string {
+	return s.UUID
+}
+
+// Index returns an unsorted list of blocks that can be retrieved from
+// this server.
+func (s *KeepServiceModel) Index(c *Client, prefix string) ([]IndexEntry, error) {
+	url := s.url("index/" + prefix)
+	req, err := http.NewRequest("GET", url, nil)
+	if err != nil {
+		return nil, fmt.Errorf("NewRequest(%v): %v", url, err)
+	}
+	resp, err := c.Do(req)
+	if err != nil {
+		return nil, fmt.Errorf("Do(%v): %v", url, err)
+	} else if resp.StatusCode != 200 {
+		return nil, fmt.Errorf("%v: %v", url, resp.Status)
+	}
+	defer resp.Body.Close()
+
+	var entries []IndexEntry
+	scanner := bufio.NewScanner(resp.Body)
+	sawEOF := false
+	for scanner.Scan() {
+		if sawEOF {
+			return nil, fmt.Errorf("Index response contained non-terminal blank line")
+		}
+		line := scanner.Text()
+		if line == "" {
+			sawEOF = true
+			continue
+		}
+		fields := strings.Split(line, " ")
+		if len(fields) != 2 {
+			return nil, fmt.Errorf("Malformed index line %q: %d fields", line, len(fields))
+		}
+		mtime, err := strconv.ParseInt(fields[1], 10, 64)
+		if err != nil {
+			return nil, fmt.Errorf("Malformed index line %q: mtime: %v", line, err)
+		}
+		entries = append(entries, IndexEntry{
+			SizedDigest: SizedDigest(fields[0]),
+			Mtime:       mtime,
+		})
+	}
+	if err := scanner.Err(); err != nil {
+		return nil, fmt.Errorf("Error scanning index response: %v", err)
+	}
+	if !sawEOF {
+		return nil, fmt.Errorf("Index response had no EOF marker")
+	}
+	return entries, nil
+}
diff --git a/services/keep-balance/main.go b/services/keep-balance/main.go
new file mode 100644
index 0000000..fe7f554
--- /dev/null
+++ b/services/keep-balance/main.go
@@ -0,0 +1,59 @@
+package main
+
+import (
+	"encoding/json"
+	"flag"
+	"log"
+	"os"
+	"io/ioutil"
+)
+
+type Config struct {
+	Client Client
+	Apply bool
+}
+
+var debugf func(string, ...interface{}) = func(string, ...interface{}) {}
+
+func main() {
+	var config Config
+
+	configPath := flag.String("config",
+		os.Getenv("HOME")+"/.config/arvados/keep-balance.json",
+		"configuration file")
+	debugFlag := flag.Bool("debug", false, "enable debug messages")
+	flag.Parse()
+
+	if buf, err := ioutil.ReadFile(*configPath); err != nil {
+		log.Fatalf("Reading %q: %v", *configPath, err)
+	} else if err = json.Unmarshal(buf, &config); err != nil {
+		log.Fatalf("Decoding %q: %v", *configPath, err)
+	}
+
+	if *debugFlag {
+		debugf = log.Printf
+		if j, err := json.Marshal(config); err != nil {
+			log.Fatal(err)
+		} else {
+			log.Printf("config is %s", j)
+		}
+	}
+	if err := Run(config); err != nil {
+		log.Fatal(err)
+	}
+}
+
+func Run(config Config) error {
+	var bal Balancer
+	bal.Logger = log.New(os.Stderr, "", log.LstdFlags)
+	err := bal.GetCurrentState(&config.Client)
+	if err != nil {
+		return err
+	}
+	bal.ComputeChangeSets()
+	bal.PrintStatistics()
+	if config.Apply {
+		err = bal.ApplyChangeSets(&config.Client)
+	}
+	return err
+}
diff --git a/services/keep-balance/time_me.go b/services/keep-balance/time_me.go
new file mode 100644
index 0000000..549ccd7
--- /dev/null
+++ b/services/keep-balance/time_me.go
@@ -0,0 +1,13 @@
+package main
+
+import (
+	"fmt"
+	"time"
+)
+
+func timeMe() func(string) {
+	t0 := time.Now()
+	return func(label string) {
+		fmt.Printf("%s: %v\n", label, time.Since(t0))
+	}
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list