[ARVADOS] updated: 6251cb2459021711d9d9c0c8aee47149b4ed336c

Git user git at public.curoverse.com
Tue May 17 23:15:47 EDT 2016


Summary of changes:
 sdk/go/x/arvados/client.go                         |  15 +-
 sdk/go/x/arvados/collection.go                     |   2 +-
 services/keep-balance/balance.go                   |  79 ++++++---
 .../x/arvados => services/keep-balance}/client.go  |  13 +-
 services/keep-balance/example-config.json          |   3 +-
 services/keep-balance/integration_test.go          |  85 +++++++++
 services/keep-balance/keep_service.go              |  45 ++++-
 services/keep-balance/main.go                      |  32 +++-
 services/keep-balance/main_test.go                 | 191 +++++++++++++++++++++
 services/keep-balance/time_me.go                   |   9 +-
 services/keepstore/keepstore.go                    |   7 +-
 11 files changed, 433 insertions(+), 48 deletions(-)
 copy {sdk/go/x/arvados => services/keep-balance}/client.go (90%)
 create mode 100644 services/keep-balance/integration_test.go
 create mode 100644 services/keep-balance/main_test.go

       via  6251cb2459021711d9d9c0c8aee47149b4ed336c (commit)
       via  9fd62c6d9cc009af02aa709d5aadba37e1ce5b5b (commit)
      from  abe6659e55121a45342bbacd2651c7b51cd7d4f8 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 6251cb2459021711d9d9c0c8aee47149b4ed336c
Author: Tom Clegg <tom at curoverse.com>
Date:   Tue May 17 23:15:38 2016 -0400

    9162: Integration test. Fix keepstore panic.

diff --git a/sdk/go/x/arvados/client.go b/sdk/go/x/arvados/client.go
index 0bbb879..16a12cb 100644
--- a/sdk/go/x/arvados/client.go
+++ b/sdk/go/x/arvados/client.go
@@ -1,6 +1,7 @@
 package arvados
 
 import (
+	"crypto/tls"
 	"encoding/json"
 	"fmt"
 	"io"
@@ -117,10 +118,15 @@ func (c *Client) RequestAndDecode(dst interface{}, method, path string, body io.
 }
 
 func (c *Client) httpClient() *http.Client {
-	if c.Client == nil {
-		return http.DefaultClient
+	client := c.Client
+	if client == nil {
+		client = http.DefaultClient
 	}
-	return c.Client
+	if c.Insecure {
+		client.Transport = &http.Transport{
+			TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}
+	}
+	return client
 }
 
 func (c *Client) apiURL(path string) string {
diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go
index dd8601d..b86bbcd 100644
--- a/services/keep-balance/balance.go
+++ b/services/keep-balance/balance.go
@@ -73,7 +73,7 @@ func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
 func (bal *Balancer) CheckSanityLate() error {
 	if bal.errors != nil {
 		for _, err := range bal.errors {
-			log.Println("deferred error:", err)
+			bal.Logf("deferred error: %v", err)
 		}
 		return fmt.Errorf("cannot proceed safely after deferred errors")
 	}
@@ -109,7 +109,7 @@ func (bal *Balancer) CheckSanityLate() error {
 //
 // It encodes the resulting information in BlockStateMap.
 func (bal *Balancer) GetCurrentState(c *arvados.Client) error {
-	defer timeMe()("GetCurrentState")
+	defer timeMe(bal.Logger, "GetCurrentState")()
 	bal.BlockStateMap = NewBlockStateMap()
 	bal.KeepServices = make(map[string]*KeepService)
 
@@ -201,7 +201,7 @@ func (bal *Balancer) setupServiceRoots() {
 // desired state by copying or deleting blocks, it submits those
 // changes to the relevant KeepServices' ChangeSets.
 func (bal *Balancer) ComputeChangeSets() {
-	defer timeMe()("ComputeChangeSets")
+	defer timeMe(bal.Logger, "ComputeChangeSets")()
 	bal.setupServiceRoots()
 
 	type balanceTask struct {
@@ -339,25 +339,27 @@ func (bal *Balancer) PrintStatistics() {
 			desired.blocks++
 			desired.bytes += bytes * int64(blk.Desired)
 		}
-		current.replicas += len(blk.Replicas)
-		current.blocks++
-		current.bytes += bytes * int64(len(blk.Replicas))
+		if len(blk.Replicas) > 0 {
+			current.replicas += len(blk.Replicas)
+			current.blocks++
+			current.bytes += bytes * int64(len(blk.Replicas))
+		}
 	})
-	fmt.Println("===")
-	fmt.Println(lost, "lost (0=have<want)")
-	fmt.Println(underrep, "underreplicated (0<have<want)")
-	fmt.Println(justright, "just right (have=want)")
-	fmt.Println(overrep, "overreplicated (have>want>0)")
-	fmt.Println(unref, "unreferenced (have>want=0, new)")
-	fmt.Println(garbage, "garbage (have>want=0, old)")
-	fmt.Println("===")
-	fmt.Println(desired, "total commitment (excluding unreferenced)")
-	fmt.Println(current, "total usage")
-	fmt.Println("===")
+	bal.Logf("===")
+	bal.Logf("%s lost (0=have<want)", lost)
+	bal.Logf("%s underreplicated (0<have<want)", underrep)
+	bal.Logf("%s just right (have=want)", justright)
+	bal.Logf("%s overreplicated (have>want>0)", overrep)
+	bal.Logf("%s unreferenced (have>want=0, new)", unref)
+	bal.Logf("%s garbage (have>want=0, old)", garbage)
+	bal.Logf("===")
+	bal.Logf("%s total commitment (excluding unreferenced)", desired)
+	bal.Logf("%s total usage", current)
+	bal.Logf("===")
 	for _, srv := range bal.KeepServices {
-		fmt.Printf("%s: %v\n", srv, srv.ChangeSet)
+		bal.Logf("%s: %v\n", srv, srv.ChangeSet)
 	}
-	fmt.Println("===")
+	bal.Logf("===")
 }
 
 func (bal *Balancer) CommitPulls(c *arvados.Client) error {
@@ -381,8 +383,8 @@ func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *Ke
 	errs := make(chan error)
 	for _, srv := range bal.KeepServices {
 		go func(srv *KeepService) {
-			label = fmt.Sprintf("%s: %v", srv, label)
-			defer timeMe()(label)
+			label := fmt.Sprintf("%s: %v", srv, label)
+			defer timeMe(bal.Logger, label)()
 			err := f(srv)
 			if err != nil {
 				err = fmt.Errorf("%s: %v", label, err)
@@ -393,7 +395,7 @@ func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *Ke
 	var lastErr error
 	for _ = range bal.KeepServices {
 		if err := <-errs; err != nil {
-			log.Print(err)
+			bal.Logf("%v", err)
 			lastErr = err
 		}
 	}
diff --git a/sdk/go/x/arvados/client.go b/services/keep-balance/client.go
similarity index 90%
copy from sdk/go/x/arvados/client.go
copy to services/keep-balance/client.go
index 0bbb879..aabaf93 100644
--- a/sdk/go/x/arvados/client.go
+++ b/services/keep-balance/client.go
@@ -1,4 +1,4 @@
-package arvados
+package main
 
 import (
 	"encoding/json"
@@ -17,9 +17,6 @@ type Client struct {
 	Insecure  bool
 }
 
-// NewClientFromEnv creates a new Client that uses the service
-// endpoint and credentials given by the ARVADOS_API_* environment
-// variables.
 func NewClientFromEnv() *Client {
 	return &Client{
 		APIHost:   os.Getenv("ARVADOS_API_HOST"),
@@ -33,6 +30,7 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
 	if c.AuthToken != "" {
 		req.Header.Add("Authorization", "OAuth2 "+c.AuthToken)
 	}
+	debugf("%#v", req)
 	return c.httpClient().Do(req)
 }
 
@@ -127,15 +125,11 @@ func (c *Client) apiURL(path string) string {
 	return "https://" + c.APIHost + "/" + path
 }
 
-// DiscoveryDocument is the Arvados server's description of itself.
 type DiscoveryDocument struct {
 	DefaultCollectionReplication int   `json:"defaultCollectionReplication"`
 	BlobSignatureTTL             int64 `json:"blobSignatureTtl"`
 }
 
-// DiscoveryDocument returns a *DiscoveryDocument. The returned object
-// should not be modified: the same object may be returned by
-// subsequent calls.
 func (c *Client) DiscoveryDocument() (*DiscoveryDocument, error) {
 	var dd DiscoveryDocument
 	return &dd, c.RequestAndDecode(&dd, "GET", "discovery/v1/apis/arvados/v1/rest", nil, nil)
diff --git a/services/keep-balance/integration_test.go b/services/keep-balance/integration_test.go
new file mode 100644
index 0000000..e6365a0
--- /dev/null
+++ b/services/keep-balance/integration_test.go
@@ -0,0 +1,85 @@
+package main
+
+import (
+	"bytes"
+	"log"
+	"net/http"
+	"os"
+	"strings"
+	"time"
+
+	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+	"git.curoverse.com/arvados.git/sdk/go/arvadostest"
+	"git.curoverse.com/arvados.git/sdk/go/keepclient"
+	"git.curoverse.com/arvados.git/sdk/go/x/arvados"
+
+	check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&integrationSuite{})
+
+type integrationSuite struct {
+	config     Config
+	keepClient *keepclient.KeepClient
+}
+
+func (s *integrationSuite) SetUpSuite(c *check.C) {
+	arvadostest.ResetEnv()
+	arvadostest.StartAPI()
+	arvadostest.StartKeep(4, true)
+
+	arv, err := arvadosclient.MakeArvadosClient()
+	arv.ApiToken = arvadostest.DataManagerToken
+	c.Assert(err, check.IsNil)
+	s.keepClient = &keepclient.KeepClient{
+		Arvados: &arv,
+		Client:  &http.Client{},
+	}
+	c.Assert(s.keepClient.DiscoverKeepServers(), check.IsNil)
+	s.putReplicas(c, "foo", 4)
+	s.putReplicas(c, "bar", 1)
+}
+
+func (s *integrationSuite) putReplicas(c *check.C, data string, replicas int) {
+	s.keepClient.Want_replicas = replicas
+	_, _, err := s.keepClient.PutB([]byte(data))
+	c.Assert(err, check.IsNil)
+}
+
+func (s *integrationSuite) TearDownSuite(c *check.C) {
+	arvadostest.StopKeep(4)
+	arvadostest.StopAPI()
+}
+
+func (s *integrationSuite) SetUpTest(c *check.C) {
+	s.config = Config{
+		Client: arvados.Client{
+			APIHost:   os.Getenv("ARVADOS_API_HOST"),
+			AuthToken: arvadostest.DataManagerToken,
+			Insecure:  true,
+		},
+		ServiceTypes: []string{"disk"},
+	}
+}
+
+func (s *integrationSuite) TestBalanceAPIFixtures(c *check.C) {
+	var logBuf bytes.Buffer
+	for iter := 0; iter < 20; iter++ {
+		logBuf = bytes.Buffer{}
+		opts := RunOptions{
+			CommitPulls: true,
+			CommitTrash: true,
+			Logger:      log.New(&logBuf, "", log.LstdFlags),
+		}
+		err := Run(s.config, opts)
+		c.Check(err, check.IsNil)
+		if iter == 0 {
+			c.Check(logBuf.String(), check.Matches, `(?ms).*ChangeSet{Pulls:1.*`)
+			c.Check(logBuf.String(), check.Not(check.Matches), `(?ms).*ChangeSet{.*Trashes:[^0]}*`)
+		} else if strings.Contains(logBuf.String(), "ChangeSet{Pulls:0") {
+			break
+		}
+		time.Sleep(200 * time.Millisecond)
+	}
+	c.Check(logBuf.String(), check.Not(check.Matches), `(?ms).*0 replicas (0 blocks, 0 bytes) underreplicated.*`)
+}
diff --git a/services/keep-balance/keep_service.go b/services/keep-balance/keep_service.go
index ee9c3a1..7500b9f 100644
--- a/services/keep-balance/keep_service.go
+++ b/services/keep-balance/keep_service.go
@@ -4,6 +4,8 @@ import (
 	"encoding/json"
 	"fmt"
 	"io"
+	"io/ioutil"
+	"net/http"
 
 	"git.curoverse.com/arvados.git/sdk/go/x/arvados"
 )
@@ -24,11 +26,11 @@ func (srv *KeepService) URLBase() string {
 }
 
 func (srv *KeepService) CommitPulls(c *arvados.Client) error {
-	return srv.put(c, "/pull", srv.ChangeSet.Pulls)
+	return srv.put(c, "pull", srv.ChangeSet.Pulls)
 }
 
 func (srv *KeepService) CommitTrash(c *arvados.Client) error {
-	return srv.put(c, "/trash", srv.ChangeSet.Trashes)
+	return srv.put(c, "trash", srv.ChangeSet.Trashes)
 }
 
 func (srv *KeepService) put(c *arvados.Client, path string, data interface{}) error {
@@ -38,11 +40,23 @@ func (srv *KeepService) put(c *arvados.Client, path string, data interface{}) er
 		enc := json.NewEncoder(jsonW)
 		errC <- enc.Encode(data)
 	}()
-	err := c.RequestAndDecode(nil, "PUT", path, jsonR, nil)
+	url := srv.URLBase() + "/" + path
+	req, err := http.NewRequest("PUT", url, ioutil.NopCloser(jsonR))
+	if err != nil {
+		return fmt.Errorf("building request for %s: %v", url, err)
+	}
+
+	// Workaround bug (in keepstore?) -- if the server end closes
+	// the connection without setting Connection: Close, our
+	// client will hang for a while next time it tries to reuse
+	// the connection.
+	req.Close = true
+
+	err = c.DoAndDecode(nil, req)
 	if encErr := <-errC; encErr != nil {
-		return fmt.Errorf("encoding data for %v: %v", path, encErr)
+		return fmt.Errorf("encoding data for %s: %v", url, encErr)
 	} else if err != nil {
-		return fmt.Errorf("PUT %v: %v", path, err)
+		return err
 	}
 	return nil
 }
diff --git a/services/keep-balance/main.go b/services/keep-balance/main.go
index 6b66dd1..23dfaa2 100644
--- a/services/keep-balance/main.go
+++ b/services/keep-balance/main.go
@@ -18,6 +18,7 @@ type Config struct {
 type RunOptions struct {
 	CommitPulls bool
 	CommitTrash bool
+	Logger *log.Logger
 }
 
 var debugf func(string, ...interface{}) = func(string, ...interface{}) {}
@@ -56,8 +57,11 @@ func main() {
 }
 
 func Run(config Config, runOptions RunOptions) error {
+	if runOptions.Logger == nil {
+		runOptions.Logger = log.New(os.Stderr, "", log.LstdFlags)
+	}
 	bal := Balancer{
-		Logger:       log.New(os.Stderr, "", log.LstdFlags),
+		Logger:       runOptions.Logger,
 		ServiceTypes: config.ServiceTypes,
 	}
 
diff --git a/services/keep-balance/main_test.go b/services/keep-balance/main_test.go
index f0479b6..73f92c8 100644
--- a/services/keep-balance/main_test.go
+++ b/services/keep-balance/main_test.go
@@ -3,7 +3,9 @@ package main
 import (
 	_ "encoding/json"
 	"fmt"
+	"io"
 	"io/ioutil"
+	"log"
 	"net/http"
 	"net/http/httptest"
 	"sync"
@@ -20,11 +22,7 @@ type stubServer struct {
 	srv      *httptest.Server
 	mutex    sync.Mutex
 	Requests []http.Request
-}
-
-type mainSuite struct {
-	stub   stubServer
-	config Config
+	logf     func(string, ...interface{})
 }
 
 func (s *stubServer) start() *http.Client {
@@ -35,6 +33,7 @@ func (s *stubServer) start() *http.Client {
 	s.srv = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
 		s.mutex.Lock()
 		s.Requests = append(s.Requests, *r)
+		s.logf("%+v", r)
 		s.mutex.Unlock()
 		w.Header().Set("Content-Type", "application/json")
 		s.mux.ServeHTTP(w, r)
@@ -98,11 +97,38 @@ func (s *stubServer) serveKeepstorePull() *int {
 	return s.serveStatic("/pull", `{}`)
 }
 
+type mainSuite struct {
+	stub   stubServer
+	config Config
+}
+
+func (s *mainSuite) logger(c *check.C) *log.Logger {
+	r, w := io.Pipe()
+	go func() {
+		buf := make([]byte, 10000)
+		for {
+			n, err := r.Read(buf)
+			if n > 0 {
+				if buf[n-1] == '\n' {
+					n--
+				}
+				c.Log(string(buf[:n]))
+			}
+			if err != nil {
+				break
+			}
+		}
+	}()
+	return log.New(w, "", 0)
+}
+
 func (s *mainSuite) SetUpTest(c *check.C) {
 	s.config = Config{Client: arvados.Client{
 		AuthToken: "xyzzy",
 		APIHost:   "zzzzz.arvadosapi.com",
 		Client:    s.stub.start()}}
+	s.stub.serveDiscoveryDoc()
+	s.stub.logf = c.Logf
 }
 
 func (s *mainSuite) TearDownTest(c *check.C) {
@@ -113,8 +139,8 @@ func (s *mainSuite) TestRefuseZeroCollections(c *check.C) {
 	opts := RunOptions{
 		CommitPulls: true,
 		CommitTrash: true,
+		Logger:      s.logger(c),
 	}
-	s.stub.serveDiscoveryDoc()
 	s.stub.serveCurrentUserAdmin()
 	countCollectionsList := s.stub.serveZeroCollections()
 	s.stub.serveZeroKeepServices()
@@ -129,14 +155,37 @@ func (s *mainSuite) TestRefuseNonAdmin(c *check.C) {
 	opts := RunOptions{
 		CommitPulls: true,
 		CommitTrash: true,
+		Logger:      s.logger(c),
 	}
-	s.stub.serveDiscoveryDoc()
 	countCurrentUser := s.stub.serveCurrentUserNotAdmin()
 	countColl := s.stub.serveZeroCollections()
 	countKS := s.stub.serveZeroKeepServices()
 	countTrash := s.stub.serveKeepstoreTrash()
+	countPull := s.stub.serveKeepstorePull()
 	err := Run(s.config, opts)
-	c.Check(err, check.ErrorMatches, "not admin")
+	c.Check(err, check.ErrorMatches, "Current user .* is not .* admin user")
 	c.Check(*countCurrentUser, check.Equals, 1)
 	c.Check(*countColl, check.Equals, 0)
+	c.Check(*countKS, check.Equals, 0)
+	c.Check(*countTrash, check.Equals, 0)
+	c.Check(*countPull, check.Equals, 0)
+}
+
+func (s *mainSuite) TestDryRun(c *check.C) {
+	opts := RunOptions{
+		CommitPulls: false,
+		CommitTrash: false,
+		Logger:      s.logger(c),
+	}
+	s.stub.serveCurrentUserAdmin()
+	countColl := s.stub.serveZeroCollections()
+	countKS := s.stub.serveZeroKeepServices()
+	countTrash := s.stub.serveKeepstoreTrash()
+	countPull := s.stub.serveKeepstorePull()
+	err := Run(s.config, opts)
+	c.Check(*countColl, check.Equals, 2)
+	c.Check(*countKS, check.Equals, 1)
+	c.Check(err, check.IsNil)
+	c.Check(*countTrash, check.Equals, 0)
+	c.Check(*countPull, check.Equals, 0)
 }
diff --git a/services/keep-balance/time_me.go b/services/keep-balance/time_me.go
index 549ccd7..bbc5252 100644
--- a/services/keep-balance/time_me.go
+++ b/services/keep-balance/time_me.go
@@ -1,13 +1,14 @@
 package main
 
 import (
-	"fmt"
+	"log"
 	"time"
 )
 
-func timeMe() func(string) {
+func timeMe(logger *log.Logger, label string) func() {
 	t0 := time.Now()
-	return func(label string) {
-		fmt.Printf("%s: %v\n", label, time.Since(t0))
+	logger.Printf("%s: start", label)
+	return func() {
+		logger.Printf("%s: %v", label, time.Since(t0))
 	}
 }
diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index 80d8670..d7da67c 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -4,6 +4,7 @@ import (
 	"bytes"
 	"flag"
 	"fmt"
+	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
 	"git.curoverse.com/arvados.git/sdk/go/httpserver"
 	"git.curoverse.com/arvados.git/sdk/go/keepclient"
 	"io/ioutil"
@@ -331,8 +332,12 @@ func main() {
 	}
 
 	// Initialize Pull queue and worker
+	arv, err := arvadosclient.MakeArvadosClient()
+	if err != nil {
+		log.Fatalf("MakeArvadosClient: %s", err)
+	}
 	keepClient := &keepclient.KeepClient{
-		Arvados:       nil,
+		Arvados:       &arv,
 		Want_replicas: 1,
 		Client:        &http.Client{},
 	}

commit 9fd62c6d9cc009af02aa709d5aadba37e1ce5b5b
Author: Tom Clegg <tom at curoverse.com>
Date:   Tue May 17 14:30:53 2016 -0400

    9162: Really send trash/pull lists. Add tests.

diff --git a/sdk/go/x/arvados/client.go b/sdk/go/x/arvados/client.go
index f3f1286..0bbb879 100644
--- a/sdk/go/x/arvados/client.go
+++ b/sdk/go/x/arvados/client.go
@@ -52,6 +52,9 @@ func (c *Client) DoAndDecode(dst interface{}, req *http.Request) error {
 	if resp.StatusCode != 200 {
 		return fmt.Errorf("request failed (%s): %s", req.URL, resp.Status)
 	}
+	if dst == nil {
+		return nil
+	}
 	return json.Unmarshal(buf, dst)
 }
 
diff --git a/sdk/go/x/arvados/collection.go b/sdk/go/x/arvados/collection.go
index 602b02c..e5b0922 100644
--- a/sdk/go/x/arvados/collection.go
+++ b/sdk/go/x/arvados/collection.go
@@ -126,7 +126,7 @@ func (c *Client) EachCollection(f func(Collection) error, progress func(done, to
 			}
 			last = coll
 		}
-		if last.ModifiedAt != nil && *last.ModifiedAt == filterTime {
+		if last.ModifiedAt == nil || *last.ModifiedAt == filterTime {
 			if page.ItemsAvailable > len(page.Items) {
 				// TODO: use "mtime=X && UUID>Y"
 				// filters to get all collections with
diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go
index 25e8fef..dd8601d 100644
--- a/services/keep-balance/balance.go
+++ b/services/keep-balance/balance.go
@@ -360,10 +360,43 @@ func (bal *Balancer) PrintStatistics() {
 	fmt.Println("===")
 }
 
-func (bal *Balancer) ApplyChangeSets(c *arvados.Client) error {
-	defer timeMe()("ApplyChangeSets")
+func (bal *Balancer) CommitPulls(c *arvados.Client) error {
+	return bal.commitAsync(c, "send pull list",
+		func(srv *KeepService) error {
+			return srv.CommitPulls(c)
+		})
+}
+
+func (bal *Balancer) CommitTrash(c *arvados.Client) error {
+	return bal.commitAsync(c, "send trash list",
+		func(srv *KeepService) error {
+			return srv.CommitTrash(c)
+		})
+}
+
+func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *KeepService) error) error {
 	if err := bal.CheckSanityLate(); err != nil {
 		return err
 	}
-	return fmt.Errorf("unimplemented (FIXME)")
+	errs := make(chan error)
+	for _, srv := range bal.KeepServices {
+		go func(srv *KeepService) {
+			label = fmt.Sprintf("%s: %v", srv, label)
+			defer timeMe()(label)
+			err := f(srv)
+			if err != nil {
+				err = fmt.Errorf("%s: %v", label, err)
+			}
+			errs <- err
+		}(srv)
+	}
+	var lastErr error
+	for _ = range bal.KeepServices {
+		if err := <-errs; err != nil {
+			log.Print(err)
+			lastErr = err
+		}
+	}
+	close(errs)
+	return lastErr
 }
diff --git a/services/keep-balance/example-config.json b/services/keep-balance/example-config.json
index a6a6e58..b7cecee 100644
--- a/services/keep-balance/example-config.json
+++ b/services/keep-balance/example-config.json
@@ -4,7 +4,8 @@
         "AuthToken": "zzzzzz",
         "Insecure": false
     },
-    "Apply": false,
+    "CommitPull": true,
+    "CommitTrash": false,
     "ServiceTypes": [
         "disk"
     ]
diff --git a/services/keep-balance/keep_service.go b/services/keep-balance/keep_service.go
index e158728..ee9c3a1 100644
--- a/services/keep-balance/keep_service.go
+++ b/services/keep-balance/keep_service.go
@@ -1,7 +1,10 @@
 package main
 
 import (
+	"encoding/json"
 	"fmt"
+	"io"
+
 	"git.curoverse.com/arvados.git/sdk/go/x/arvados"
 )
 
@@ -10,12 +13,36 @@ type KeepService struct {
 	ChangeSet
 }
 
-func (srv KeepService) String() string {
+func (srv *KeepService) String() string {
 	return fmt.Sprintf("%s (%s:%d, %s)", srv.UUID, srv.ServiceHost, srv.ServicePort, srv.ServiceType)
 }
 
 var ksSchemes = map[bool]string{false: "http", true: "https"}
 
-func (srv KeepService) URLBase() string {
+func (srv *KeepService) URLBase() string {
 	return fmt.Sprintf("%s://%s:%d", ksSchemes[srv.ServiceSSLFlag], srv.ServiceHost, srv.ServicePort)
 }
+
+func (srv *KeepService) CommitPulls(c *arvados.Client) error {
+	return srv.put(c, "/pull", srv.ChangeSet.Pulls)
+}
+
+func (srv *KeepService) CommitTrash(c *arvados.Client) error {
+	return srv.put(c, "/trash", srv.ChangeSet.Trashes)
+}
+
+func (srv *KeepService) put(c *arvados.Client, path string, data interface{}) error {
+	errC := make(chan error)
+	jsonR, jsonW := io.Pipe()
+	go func() {
+		enc := json.NewEncoder(jsonW)
+		errC <- enc.Encode(data)
+	}()
+	err := c.RequestAndDecode(nil, "PUT", path, jsonR, nil)
+	if encErr := <-errC; encErr != nil {
+		return fmt.Errorf("encoding data for %v: %v", path, encErr)
+	} else if err != nil {
+		return fmt.Errorf("PUT %v: %v", path, err)
+	}
+	return nil
+}
diff --git a/services/keep-balance/main.go b/services/keep-balance/main.go
index 25ed818..6b66dd1 100644
--- a/services/keep-balance/main.go
+++ b/services/keep-balance/main.go
@@ -12,18 +12,27 @@ import (
 
 type Config struct {
 	Client       arvados.Client
-	Apply        bool
 	ServiceTypes []string
 }
 
+type RunOptions struct {
+	CommitPulls bool
+	CommitTrash bool
+}
+
 var debugf func(string, ...interface{}) = func(string, ...interface{}) {}
 
 func main() {
 	var config Config
+	var runOptions RunOptions
 
 	configPath := flag.String("config",
 		os.Getenv("HOME")+"/.config/arvados/keep-balance.json",
 		"configuration file")
+	flag.BoolVar(&runOptions.CommitPulls, "commit-pulls", false,
+		"send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)")
+	flag.BoolVar(&runOptions.CommitTrash, "commit-trash", false,
+		"send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)")
 	debugFlag := flag.Bool("debug", false, "enable debug messages")
 	flag.Parse()
 
@@ -41,12 +50,12 @@ func main() {
 			log.Printf("config is %s", j)
 		}
 	}
-	if err := Run(config); err != nil {
+	if err := Run(config, runOptions); err != nil {
 		log.Fatal(err)
 	}
 }
 
-func Run(config Config) error {
+func Run(config Config, runOptions RunOptions) error {
 	bal := Balancer{
 		Logger:       log.New(os.Stderr, "", log.LstdFlags),
 		ServiceTypes: config.ServiceTypes,
@@ -64,8 +73,15 @@ func Run(config Config) error {
 	if err = bal.CheckSanityLate(); err != nil {
 		return err
 	}
-	if config.Apply {
-		err = bal.ApplyChangeSets(&config.Client)
+	if runOptions.CommitPulls {
+		err = bal.CommitPulls(&config.Client)
+		if err != nil {
+			// Skip trash if we can't pull. (Too cautious?)
+			return err
+		}
+	}
+	if runOptions.CommitTrash {
+		err = bal.CommitTrash(&config.Client)
 	}
 	return err
 }
diff --git a/services/keep-balance/main_test.go b/services/keep-balance/main_test.go
new file mode 100644
index 0000000..f0479b6
--- /dev/null
+++ b/services/keep-balance/main_test.go
@@ -0,0 +1,142 @@
+package main
+
+import (
+	_ "encoding/json"
+	"fmt"
+	"io/ioutil"
+	"net/http"
+	"net/http/httptest"
+	"sync"
+
+	"git.curoverse.com/arvados.git/sdk/go/x/arvados"
+
+	check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&mainSuite{})
+
+type stubServer struct {
+	mux      *http.ServeMux
+	srv      *httptest.Server
+	mutex    sync.Mutex
+	Requests []http.Request
+}
+
+type mainSuite struct {
+	stub   stubServer
+	config Config
+}
+
+func (s *stubServer) start() *http.Client {
+	// Set up a config.Client that forwards all requests to s.mux
+	// via s.srv. Test cases will attach handlers to s.mux to get
+	// the desired responses.
+	s.mux = http.NewServeMux()
+	s.srv = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+		s.mutex.Lock()
+		s.Requests = append(s.Requests, *r)
+		s.mutex.Unlock()
+		w.Header().Set("Content-Type", "application/json")
+		s.mux.ServeHTTP(w, r)
+	}))
+	return &http.Client{Transport: s}
+}
+
+func (s *stubServer) RoundTrip(req *http.Request) (*http.Response, error) {
+	w := httptest.NewRecorder()
+	s.mux.ServeHTTP(w, req)
+	return &http.Response{
+		StatusCode: w.Code,
+		Status:     fmt.Sprintf("%d %s", w.Code, http.StatusText(w.Code)),
+		Header:     w.HeaderMap,
+		Body:       ioutil.NopCloser(w.Body)}, nil
+}
+
+func (s *stubServer) stop() {
+	s.srv.Close()
+}
+
+func (s *stubServer) serveStatic(path, data string) *int {
+	var count int
+	s.mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
+		count++
+		fmt.Fprintln(w, data)
+	})
+	return &count
+}
+
+func (s *stubServer) serveCurrentUserAdmin() *int {
+	return s.serveStatic("/arvados/v1/users/current",
+		`{"uuid":"zzzzz-tpzed-000000000000000","is_admin":true,"is_active":true}`)
+}
+
+func (s *stubServer) serveCurrentUserNotAdmin() *int {
+	return s.serveStatic("/arvados/v1/users/current",
+		`{"uuid":"zzzzz-tpzed-000000000000000","is_admin":false,"is_active":true}`)
+}
+
+func (s *stubServer) serveDiscoveryDoc() *int {
+	return s.serveStatic("/discovery/v1/apis/arvados/v1/rest",
+		`{"default_collection_replication":2}`)
+}
+
+func (s *stubServer) serveZeroCollections() *int {
+	return s.serveStatic("/arvados/v1/collections",
+		`{"items":[],"items_available":0}`)
+}
+
+func (s *stubServer) serveZeroKeepServices() *int {
+	return s.serveStatic("/arvados/v1/keep_services",
+		`{"items":[],"items_available":0}`)
+}
+
+func (s *stubServer) serveKeepstoreTrash() *int {
+	return s.serveStatic("/trash", `{}`)
+}
+
+func (s *stubServer) serveKeepstorePull() *int {
+	return s.serveStatic("/pull", `{}`)
+}
+
+func (s *mainSuite) SetUpTest(c *check.C) {
+	s.config = Config{Client: arvados.Client{
+		AuthToken: "xyzzy",
+		APIHost:   "zzzzz.arvadosapi.com",
+		Client:    s.stub.start()}}
+}
+
+func (s *mainSuite) TearDownTest(c *check.C) {
+	s.stub.stop()
+}
+
+func (s *mainSuite) TestRefuseZeroCollections(c *check.C) {
+	opts := RunOptions{
+		CommitPulls: true,
+		CommitTrash: true,
+	}
+	s.stub.serveDiscoveryDoc()
+	s.stub.serveCurrentUserAdmin()
+	countCollectionsList := s.stub.serveZeroCollections()
+	s.stub.serveZeroKeepServices()
+	countTrash := s.stub.serveKeepstoreTrash()
+	err := Run(s.config, opts)
+	c.Check(err, check.ErrorMatches, "received zero collections")
+	c.Check(*countCollectionsList, check.Equals, 2)
+	c.Check(*countTrash, check.Equals, 0)
+}
+
+func (s *mainSuite) TestRefuseNonAdmin(c *check.C) {
+	opts := RunOptions{
+		CommitPulls: true,
+		CommitTrash: true,
+	}
+	s.stub.serveDiscoveryDoc()
+	countCurrentUser := s.stub.serveCurrentUserNotAdmin()
+	countColl := s.stub.serveZeroCollections()
+	countKS := s.stub.serveZeroKeepServices()
+	countTrash := s.stub.serveKeepstoreTrash()
+	err := Run(s.config, opts)
+	c.Check(err, check.ErrorMatches, "not admin")
+	c.Check(*countCurrentUser, check.Equals, 1)
+	c.Check(*countColl, check.Equals, 0)
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list