[ARVADOS] updated: 6251cb2459021711d9d9c0c8aee47149b4ed336c
Git user
git at public.curoverse.com
Tue May 17 23:15:47 EDT 2016
Summary of changes:
sdk/go/x/arvados/client.go | 15 +-
sdk/go/x/arvados/collection.go | 2 +-
services/keep-balance/balance.go | 79 ++++++---
.../x/arvados => services/keep-balance}/client.go | 13 +-
services/keep-balance/example-config.json | 3 +-
services/keep-balance/integration_test.go | 85 +++++++++
services/keep-balance/keep_service.go | 45 ++++-
services/keep-balance/main.go | 32 +++-
services/keep-balance/main_test.go | 191 +++++++++++++++++++++
services/keep-balance/time_me.go | 9 +-
services/keepstore/keepstore.go | 7 +-
11 files changed, 433 insertions(+), 48 deletions(-)
copy {sdk/go/x/arvados => services/keep-balance}/client.go (90%)
create mode 100644 services/keep-balance/integration_test.go
create mode 100644 services/keep-balance/main_test.go
via 6251cb2459021711d9d9c0c8aee47149b4ed336c (commit)
via 9fd62c6d9cc009af02aa709d5aadba37e1ce5b5b (commit)
from abe6659e55121a45342bbacd2651c7b51cd7d4f8 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 6251cb2459021711d9d9c0c8aee47149b4ed336c
Author: Tom Clegg <tom at curoverse.com>
Date: Tue May 17 23:15:38 2016 -0400
9162: Integration test. Fix keepstore panic.
diff --git a/sdk/go/x/arvados/client.go b/sdk/go/x/arvados/client.go
index 0bbb879..16a12cb 100644
--- a/sdk/go/x/arvados/client.go
+++ b/sdk/go/x/arvados/client.go
@@ -1,6 +1,7 @@
package arvados
import (
+ "crypto/tls"
"encoding/json"
"fmt"
"io"
@@ -117,10 +118,15 @@ func (c *Client) RequestAndDecode(dst interface{}, method, path string, body io.
}
func (c *Client) httpClient() *http.Client {
- if c.Client == nil {
- return http.DefaultClient
+ client := c.Client
+ if client == nil {
+ client = http.DefaultClient
}
- return c.Client
+ if c.Insecure {
+ client.Transport = &http.Transport{
+ TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}
+ }
+ return client
}
func (c *Client) apiURL(path string) string {
diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go
index dd8601d..b86bbcd 100644
--- a/services/keep-balance/balance.go
+++ b/services/keep-balance/balance.go
@@ -73,7 +73,7 @@ func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
func (bal *Balancer) CheckSanityLate() error {
if bal.errors != nil {
for _, err := range bal.errors {
- log.Println("deferred error:", err)
+ bal.Logf("deferred error: %v", err)
}
return fmt.Errorf("cannot proceed safely after deferred errors")
}
@@ -109,7 +109,7 @@ func (bal *Balancer) CheckSanityLate() error {
//
// It encodes the resulting information in BlockStateMap.
func (bal *Balancer) GetCurrentState(c *arvados.Client) error {
- defer timeMe()("GetCurrentState")
+ defer timeMe(bal.Logger, "GetCurrentState")()
bal.BlockStateMap = NewBlockStateMap()
bal.KeepServices = make(map[string]*KeepService)
@@ -201,7 +201,7 @@ func (bal *Balancer) setupServiceRoots() {
// desired state by copying or deleting blocks, it submits those
// changes to the relevant KeepServices' ChangeSets.
func (bal *Balancer) ComputeChangeSets() {
- defer timeMe()("ComputeChangeSets")
+ defer timeMe(bal.Logger, "ComputeChangeSets")()
bal.setupServiceRoots()
type balanceTask struct {
@@ -339,25 +339,27 @@ func (bal *Balancer) PrintStatistics() {
desired.blocks++
desired.bytes += bytes * int64(blk.Desired)
}
- current.replicas += len(blk.Replicas)
- current.blocks++
- current.bytes += bytes * int64(len(blk.Replicas))
+ if len(blk.Replicas) > 0 {
+ current.replicas += len(blk.Replicas)
+ current.blocks++
+ current.bytes += bytes * int64(len(blk.Replicas))
+ }
})
- fmt.Println("===")
- fmt.Println(lost, "lost (0=have<want)")
- fmt.Println(underrep, "underreplicated (0<have<want)")
- fmt.Println(justright, "just right (have=want)")
- fmt.Println(overrep, "overreplicated (have>want>0)")
- fmt.Println(unref, "unreferenced (have>want=0, new)")
- fmt.Println(garbage, "garbage (have>want=0, old)")
- fmt.Println("===")
- fmt.Println(desired, "total commitment (excluding unreferenced)")
- fmt.Println(current, "total usage")
- fmt.Println("===")
+ bal.Logf("===")
+ bal.Logf("%s lost (0=have<want)", lost)
+ bal.Logf("%s underreplicated (0<have<want)", underrep)
+ bal.Logf("%s just right (have=want)", justright)
+ bal.Logf("%s overreplicated (have>want>0)", overrep)
+ bal.Logf("%s unreferenced (have>want=0, new)", unref)
+ bal.Logf("%s garbage (have>want=0, old)", garbage)
+ bal.Logf("===")
+ bal.Logf("%s total commitment (excluding unreferenced)", desired)
+ bal.Logf("%s total usage", current)
+ bal.Logf("===")
for _, srv := range bal.KeepServices {
- fmt.Printf("%s: %v\n", srv, srv.ChangeSet)
+ bal.Logf("%s: %v\n", srv, srv.ChangeSet)
}
- fmt.Println("===")
+ bal.Logf("===")
}
func (bal *Balancer) CommitPulls(c *arvados.Client) error {
@@ -381,8 +383,8 @@ func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *Ke
errs := make(chan error)
for _, srv := range bal.KeepServices {
go func(srv *KeepService) {
- label = fmt.Sprintf("%s: %v", srv, label)
- defer timeMe()(label)
+ label := fmt.Sprintf("%s: %v", srv, label)
+ defer timeMe(bal.Logger, label)()
err := f(srv)
if err != nil {
err = fmt.Errorf("%s: %v", label, err)
@@ -393,7 +395,7 @@ func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *Ke
var lastErr error
for _ = range bal.KeepServices {
if err := <-errs; err != nil {
- log.Print(err)
+ bal.Logf("%v", err)
lastErr = err
}
}
diff --git a/sdk/go/x/arvados/client.go b/services/keep-balance/client.go
similarity index 90%
copy from sdk/go/x/arvados/client.go
copy to services/keep-balance/client.go
index 0bbb879..aabaf93 100644
--- a/sdk/go/x/arvados/client.go
+++ b/services/keep-balance/client.go
@@ -1,4 +1,4 @@
-package arvados
+package main
import (
"encoding/json"
@@ -17,9 +17,6 @@ type Client struct {
Insecure bool
}
-// NewClientFromEnv creates a new Client that uses the service
-// endpoint and credentials given by the ARVADOS_API_* environment
-// variables.
func NewClientFromEnv() *Client {
return &Client{
APIHost: os.Getenv("ARVADOS_API_HOST"),
@@ -33,6 +30,7 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
if c.AuthToken != "" {
req.Header.Add("Authorization", "OAuth2 "+c.AuthToken)
}
+ debugf("%#v", req)
return c.httpClient().Do(req)
}
@@ -127,15 +125,11 @@ func (c *Client) apiURL(path string) string {
return "https://" + c.APIHost + "/" + path
}
-// DiscoveryDocument is the Arvados server's description of itself.
type DiscoveryDocument struct {
DefaultCollectionReplication int `json:"defaultCollectionReplication"`
BlobSignatureTTL int64 `json:"blobSignatureTtl"`
}
-// DiscoveryDocument returns a *DiscoveryDocument. The returned object
-// should not be modified: the same object may be returned by
-// subsequent calls.
func (c *Client) DiscoveryDocument() (*DiscoveryDocument, error) {
var dd DiscoveryDocument
return &dd, c.RequestAndDecode(&dd, "GET", "discovery/v1/apis/arvados/v1/rest", nil, nil)
diff --git a/services/keep-balance/integration_test.go b/services/keep-balance/integration_test.go
new file mode 100644
index 0000000..e6365a0
--- /dev/null
+++ b/services/keep-balance/integration_test.go
@@ -0,0 +1,85 @@
+package main
+
+import (
+ "bytes"
+ "log"
+ "net/http"
+ "os"
+ "strings"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "git.curoverse.com/arvados.git/sdk/go/x/arvados"
+
+ check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&integrationSuite{})
+
+type integrationSuite struct {
+ config Config
+ keepClient *keepclient.KeepClient
+}
+
+func (s *integrationSuite) SetUpSuite(c *check.C) {
+ arvadostest.ResetEnv()
+ arvadostest.StartAPI()
+ arvadostest.StartKeep(4, true)
+
+ arv, err := arvadosclient.MakeArvadosClient()
+ arv.ApiToken = arvadostest.DataManagerToken
+ c.Assert(err, check.IsNil)
+ s.keepClient = &keepclient.KeepClient{
+ Arvados: &arv,
+ Client: &http.Client{},
+ }
+ c.Assert(s.keepClient.DiscoverKeepServers(), check.IsNil)
+ s.putReplicas(c, "foo", 4)
+ s.putReplicas(c, "bar", 1)
+}
+
+func (s *integrationSuite) putReplicas(c *check.C, data string, replicas int) {
+ s.keepClient.Want_replicas = replicas
+ _, _, err := s.keepClient.PutB([]byte(data))
+ c.Assert(err, check.IsNil)
+}
+
+func (s *integrationSuite) TearDownSuite(c *check.C) {
+ arvadostest.StopKeep(4)
+ arvadostest.StopAPI()
+}
+
+func (s *integrationSuite) SetUpTest(c *check.C) {
+ s.config = Config{
+ Client: arvados.Client{
+ APIHost: os.Getenv("ARVADOS_API_HOST"),
+ AuthToken: arvadostest.DataManagerToken,
+ Insecure: true,
+ },
+ ServiceTypes: []string{"disk"},
+ }
+}
+
+func (s *integrationSuite) TestBalanceAPIFixtures(c *check.C) {
+ var logBuf bytes.Buffer
+ for iter := 0; iter < 20; iter++ {
+ logBuf = bytes.Buffer{}
+ opts := RunOptions{
+ CommitPulls: true,
+ CommitTrash: true,
+ Logger: log.New(&logBuf, "", log.LstdFlags),
+ }
+ err := Run(s.config, opts)
+ c.Check(err, check.IsNil)
+ if iter == 0 {
+ c.Check(logBuf.String(), check.Matches, `(?ms).*ChangeSet{Pulls:1.*`)
+ c.Check(logBuf.String(), check.Not(check.Matches), `(?ms).*ChangeSet{.*Trashes:[^0]}*`)
+ } else if strings.Contains(logBuf.String(), "ChangeSet{Pulls:0") {
+ break
+ }
+ time.Sleep(200 * time.Millisecond)
+ }
+ c.Check(logBuf.String(), check.Not(check.Matches), `(?ms).*0 replicas (0 blocks, 0 bytes) underreplicated.*`)
+}
diff --git a/services/keep-balance/keep_service.go b/services/keep-balance/keep_service.go
index ee9c3a1..7500b9f 100644
--- a/services/keep-balance/keep_service.go
+++ b/services/keep-balance/keep_service.go
@@ -4,6 +4,8 @@ import (
"encoding/json"
"fmt"
"io"
+ "io/ioutil"
+ "net/http"
"git.curoverse.com/arvados.git/sdk/go/x/arvados"
)
@@ -24,11 +26,11 @@ func (srv *KeepService) URLBase() string {
}
func (srv *KeepService) CommitPulls(c *arvados.Client) error {
- return srv.put(c, "/pull", srv.ChangeSet.Pulls)
+ return srv.put(c, "pull", srv.ChangeSet.Pulls)
}
func (srv *KeepService) CommitTrash(c *arvados.Client) error {
- return srv.put(c, "/trash", srv.ChangeSet.Trashes)
+ return srv.put(c, "trash", srv.ChangeSet.Trashes)
}
func (srv *KeepService) put(c *arvados.Client, path string, data interface{}) error {
@@ -38,11 +40,23 @@ func (srv *KeepService) put(c *arvados.Client, path string, data interface{}) er
enc := json.NewEncoder(jsonW)
errC <- enc.Encode(data)
}()
- err := c.RequestAndDecode(nil, "PUT", path, jsonR, nil)
+ url := srv.URLBase() + "/" + path
+ req, err := http.NewRequest("PUT", url, ioutil.NopCloser(jsonR))
+ if err != nil {
+ return fmt.Errorf("building request for %s: %v", url, err)
+ }
+
+ // Workaround bug (in keepstore?) -- if the server end closes
+ // the connection without setting Connection: Close, our
+ // client will hang for a while next time it tries to reuse
+ // the connection.
+ req.Close = true
+
+ err = c.DoAndDecode(nil, req)
if encErr := <-errC; encErr != nil {
- return fmt.Errorf("encoding data for %v: %v", path, encErr)
+ return fmt.Errorf("encoding data for %s: %v", url, encErr)
} else if err != nil {
- return fmt.Errorf("PUT %v: %v", path, err)
+ return err
}
return nil
}
diff --git a/services/keep-balance/main.go b/services/keep-balance/main.go
index 6b66dd1..23dfaa2 100644
--- a/services/keep-balance/main.go
+++ b/services/keep-balance/main.go
@@ -18,6 +18,7 @@ type Config struct {
type RunOptions struct {
CommitPulls bool
CommitTrash bool
+ Logger *log.Logger
}
var debugf func(string, ...interface{}) = func(string, ...interface{}) {}
@@ -56,8 +57,11 @@ func main() {
}
func Run(config Config, runOptions RunOptions) error {
+ if runOptions.Logger == nil {
+ runOptions.Logger = log.New(os.Stderr, "", log.LstdFlags)
+ }
bal := Balancer{
- Logger: log.New(os.Stderr, "", log.LstdFlags),
+ Logger: runOptions.Logger,
ServiceTypes: config.ServiceTypes,
}
diff --git a/services/keep-balance/main_test.go b/services/keep-balance/main_test.go
index f0479b6..73f92c8 100644
--- a/services/keep-balance/main_test.go
+++ b/services/keep-balance/main_test.go
@@ -3,7 +3,9 @@ package main
import (
_ "encoding/json"
"fmt"
+ "io"
"io/ioutil"
+ "log"
"net/http"
"net/http/httptest"
"sync"
@@ -20,11 +22,7 @@ type stubServer struct {
srv *httptest.Server
mutex sync.Mutex
Requests []http.Request
-}
-
-type mainSuite struct {
- stub stubServer
- config Config
+ logf func(string, ...interface{})
}
func (s *stubServer) start() *http.Client {
@@ -35,6 +33,7 @@ func (s *stubServer) start() *http.Client {
s.srv = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
s.mutex.Lock()
s.Requests = append(s.Requests, *r)
+ s.logf("%+v", r)
s.mutex.Unlock()
w.Header().Set("Content-Type", "application/json")
s.mux.ServeHTTP(w, r)
@@ -98,11 +97,38 @@ func (s *stubServer) serveKeepstorePull() *int {
return s.serveStatic("/pull", `{}`)
}
+type mainSuite struct {
+ stub stubServer
+ config Config
+}
+
+func (s *mainSuite) logger(c *check.C) *log.Logger {
+ r, w := io.Pipe()
+ go func() {
+ buf := make([]byte, 10000)
+ for {
+ n, err := r.Read(buf)
+ if n > 0 {
+ if buf[n-1] == '\n' {
+ n--
+ }
+ c.Log(string(buf[:n]))
+ }
+ if err != nil {
+ break
+ }
+ }
+ }()
+ return log.New(w, "", 0)
+}
+
func (s *mainSuite) SetUpTest(c *check.C) {
s.config = Config{Client: arvados.Client{
AuthToken: "xyzzy",
APIHost: "zzzzz.arvadosapi.com",
Client: s.stub.start()}}
+ s.stub.serveDiscoveryDoc()
+ s.stub.logf = c.Logf
}
func (s *mainSuite) TearDownTest(c *check.C) {
@@ -113,8 +139,8 @@ func (s *mainSuite) TestRefuseZeroCollections(c *check.C) {
opts := RunOptions{
CommitPulls: true,
CommitTrash: true,
+ Logger: s.logger(c),
}
- s.stub.serveDiscoveryDoc()
s.stub.serveCurrentUserAdmin()
countCollectionsList := s.stub.serveZeroCollections()
s.stub.serveZeroKeepServices()
@@ -129,14 +155,37 @@ func (s *mainSuite) TestRefuseNonAdmin(c *check.C) {
opts := RunOptions{
CommitPulls: true,
CommitTrash: true,
+ Logger: s.logger(c),
}
- s.stub.serveDiscoveryDoc()
countCurrentUser := s.stub.serveCurrentUserNotAdmin()
countColl := s.stub.serveZeroCollections()
countKS := s.stub.serveZeroKeepServices()
countTrash := s.stub.serveKeepstoreTrash()
+ countPull := s.stub.serveKeepstorePull()
err := Run(s.config, opts)
- c.Check(err, check.ErrorMatches, "not admin")
+ c.Check(err, check.ErrorMatches, "Current user .* is not .* admin user")
c.Check(*countCurrentUser, check.Equals, 1)
c.Check(*countColl, check.Equals, 0)
+ c.Check(*countKS, check.Equals, 0)
+ c.Check(*countTrash, check.Equals, 0)
+ c.Check(*countPull, check.Equals, 0)
+}
+
+func (s *mainSuite) TestDryRun(c *check.C) {
+ opts := RunOptions{
+ CommitPulls: false,
+ CommitTrash: false,
+ Logger: s.logger(c),
+ }
+ s.stub.serveCurrentUserAdmin()
+ countColl := s.stub.serveZeroCollections()
+ countKS := s.stub.serveZeroKeepServices()
+ countTrash := s.stub.serveKeepstoreTrash()
+ countPull := s.stub.serveKeepstorePull()
+ err := Run(s.config, opts)
+ c.Check(*countColl, check.Equals, 2)
+ c.Check(*countKS, check.Equals, 1)
+ c.Check(err, check.IsNil)
+ c.Check(*countTrash, check.Equals, 0)
+ c.Check(*countPull, check.Equals, 0)
}
diff --git a/services/keep-balance/time_me.go b/services/keep-balance/time_me.go
index 549ccd7..bbc5252 100644
--- a/services/keep-balance/time_me.go
+++ b/services/keep-balance/time_me.go
@@ -1,13 +1,14 @@
package main
import (
- "fmt"
+ "log"
"time"
)
-func timeMe() func(string) {
+func timeMe(logger *log.Logger, label string) func() {
t0 := time.Now()
- return func(label string) {
- fmt.Printf("%s: %v\n", label, time.Since(t0))
+ logger.Printf("%s: start", label)
+ return func() {
+ logger.Printf("%s: %v", label, time.Since(t0))
}
}
diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index 80d8670..d7da67c 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -4,6 +4,7 @@ import (
"bytes"
"flag"
"fmt"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/httpserver"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
"io/ioutil"
@@ -331,8 +332,12 @@ func main() {
}
// Initialize Pull queue and worker
+ arv, err := arvadosclient.MakeArvadosClient()
+ if err != nil {
+ log.Fatalf("MakeArvadosClient: %s", err)
+ }
keepClient := &keepclient.KeepClient{
- Arvados: nil,
+ Arvados: &arv,
Want_replicas: 1,
Client: &http.Client{},
}
commit 9fd62c6d9cc009af02aa709d5aadba37e1ce5b5b
Author: Tom Clegg <tom at curoverse.com>
Date: Tue May 17 14:30:53 2016 -0400
9162: Really send trash/pull lists. Add tests.
diff --git a/sdk/go/x/arvados/client.go b/sdk/go/x/arvados/client.go
index f3f1286..0bbb879 100644
--- a/sdk/go/x/arvados/client.go
+++ b/sdk/go/x/arvados/client.go
@@ -52,6 +52,9 @@ func (c *Client) DoAndDecode(dst interface{}, req *http.Request) error {
if resp.StatusCode != 200 {
return fmt.Errorf("request failed (%s): %s", req.URL, resp.Status)
}
+ if dst == nil {
+ return nil
+ }
return json.Unmarshal(buf, dst)
}
diff --git a/sdk/go/x/arvados/collection.go b/sdk/go/x/arvados/collection.go
index 602b02c..e5b0922 100644
--- a/sdk/go/x/arvados/collection.go
+++ b/sdk/go/x/arvados/collection.go
@@ -126,7 +126,7 @@ func (c *Client) EachCollection(f func(Collection) error, progress func(done, to
}
last = coll
}
- if last.ModifiedAt != nil && *last.ModifiedAt == filterTime {
+ if last.ModifiedAt == nil || *last.ModifiedAt == filterTime {
if page.ItemsAvailable > len(page.Items) {
// TODO: use "mtime=X && UUID>Y"
// filters to get all collections with
diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go
index 25e8fef..dd8601d 100644
--- a/services/keep-balance/balance.go
+++ b/services/keep-balance/balance.go
@@ -360,10 +360,43 @@ func (bal *Balancer) PrintStatistics() {
fmt.Println("===")
}
-func (bal *Balancer) ApplyChangeSets(c *arvados.Client) error {
- defer timeMe()("ApplyChangeSets")
+func (bal *Balancer) CommitPulls(c *arvados.Client) error {
+ return bal.commitAsync(c, "send pull list",
+ func(srv *KeepService) error {
+ return srv.CommitPulls(c)
+ })
+}
+
+func (bal *Balancer) CommitTrash(c *arvados.Client) error {
+ return bal.commitAsync(c, "send trash list",
+ func(srv *KeepService) error {
+ return srv.CommitTrash(c)
+ })
+}
+
+func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *KeepService) error) error {
if err := bal.CheckSanityLate(); err != nil {
return err
}
- return fmt.Errorf("unimplemented (FIXME)")
+ errs := make(chan error)
+ for _, srv := range bal.KeepServices {
+ go func(srv *KeepService) {
+ label = fmt.Sprintf("%s: %v", srv, label)
+ defer timeMe()(label)
+ err := f(srv)
+ if err != nil {
+ err = fmt.Errorf("%s: %v", label, err)
+ }
+ errs <- err
+ }(srv)
+ }
+ var lastErr error
+ for _ = range bal.KeepServices {
+ if err := <-errs; err != nil {
+ log.Print(err)
+ lastErr = err
+ }
+ }
+ close(errs)
+ return lastErr
}
diff --git a/services/keep-balance/example-config.json b/services/keep-balance/example-config.json
index a6a6e58..b7cecee 100644
--- a/services/keep-balance/example-config.json
+++ b/services/keep-balance/example-config.json
@@ -4,7 +4,8 @@
"AuthToken": "zzzzzz",
"Insecure": false
},
- "Apply": false,
+ "CommitPull": true,
+ "CommitTrash": false,
"ServiceTypes": [
"disk"
]
diff --git a/services/keep-balance/keep_service.go b/services/keep-balance/keep_service.go
index e158728..ee9c3a1 100644
--- a/services/keep-balance/keep_service.go
+++ b/services/keep-balance/keep_service.go
@@ -1,7 +1,10 @@
package main
import (
+ "encoding/json"
"fmt"
+ "io"
+
"git.curoverse.com/arvados.git/sdk/go/x/arvados"
)
@@ -10,12 +13,36 @@ type KeepService struct {
ChangeSet
}
-func (srv KeepService) String() string {
+func (srv *KeepService) String() string {
return fmt.Sprintf("%s (%s:%d, %s)", srv.UUID, srv.ServiceHost, srv.ServicePort, srv.ServiceType)
}
var ksSchemes = map[bool]string{false: "http", true: "https"}
-func (srv KeepService) URLBase() string {
+func (srv *KeepService) URLBase() string {
return fmt.Sprintf("%s://%s:%d", ksSchemes[srv.ServiceSSLFlag], srv.ServiceHost, srv.ServicePort)
}
+
+func (srv *KeepService) CommitPulls(c *arvados.Client) error {
+ return srv.put(c, "/pull", srv.ChangeSet.Pulls)
+}
+
+func (srv *KeepService) CommitTrash(c *arvados.Client) error {
+ return srv.put(c, "/trash", srv.ChangeSet.Trashes)
+}
+
+func (srv *KeepService) put(c *arvados.Client, path string, data interface{}) error {
+ errC := make(chan error)
+ jsonR, jsonW := io.Pipe()
+ go func() {
+ enc := json.NewEncoder(jsonW)
+ errC <- enc.Encode(data)
+ }()
+ err := c.RequestAndDecode(nil, "PUT", path, jsonR, nil)
+ if encErr := <-errC; encErr != nil {
+ return fmt.Errorf("encoding data for %v: %v", path, encErr)
+ } else if err != nil {
+ return fmt.Errorf("PUT %v: %v", path, err)
+ }
+ return nil
+}
diff --git a/services/keep-balance/main.go b/services/keep-balance/main.go
index 25ed818..6b66dd1 100644
--- a/services/keep-balance/main.go
+++ b/services/keep-balance/main.go
@@ -12,18 +12,27 @@ import (
type Config struct {
Client arvados.Client
- Apply bool
ServiceTypes []string
}
+type RunOptions struct {
+ CommitPulls bool
+ CommitTrash bool
+}
+
var debugf func(string, ...interface{}) = func(string, ...interface{}) {}
func main() {
var config Config
+ var runOptions RunOptions
configPath := flag.String("config",
os.Getenv("HOME")+"/.config/arvados/keep-balance.json",
"configuration file")
+ flag.BoolVar(&runOptions.CommitPulls, "commit-pulls", false,
+ "send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)")
+ flag.BoolVar(&runOptions.CommitTrash, "commit-trash", false,
+ "send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)")
debugFlag := flag.Bool("debug", false, "enable debug messages")
flag.Parse()
@@ -41,12 +50,12 @@ func main() {
log.Printf("config is %s", j)
}
}
- if err := Run(config); err != nil {
+ if err := Run(config, runOptions); err != nil {
log.Fatal(err)
}
}
-func Run(config Config) error {
+func Run(config Config, runOptions RunOptions) error {
bal := Balancer{
Logger: log.New(os.Stderr, "", log.LstdFlags),
ServiceTypes: config.ServiceTypes,
@@ -64,8 +73,15 @@ func Run(config Config) error {
if err = bal.CheckSanityLate(); err != nil {
return err
}
- if config.Apply {
- err = bal.ApplyChangeSets(&config.Client)
+ if runOptions.CommitPulls {
+ err = bal.CommitPulls(&config.Client)
+ if err != nil {
+ // Skip trash if we can't pull. (Too cautious?)
+ return err
+ }
+ }
+ if runOptions.CommitTrash {
+ err = bal.CommitTrash(&config.Client)
}
return err
}
diff --git a/services/keep-balance/main_test.go b/services/keep-balance/main_test.go
new file mode 100644
index 0000000..f0479b6
--- /dev/null
+++ b/services/keep-balance/main_test.go
@@ -0,0 +1,142 @@
+package main
+
+import (
+ _ "encoding/json"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+ "net/http/httptest"
+ "sync"
+
+ "git.curoverse.com/arvados.git/sdk/go/x/arvados"
+
+ check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&mainSuite{})
+
+type stubServer struct {
+ mux *http.ServeMux
+ srv *httptest.Server
+ mutex sync.Mutex
+ Requests []http.Request
+}
+
+type mainSuite struct {
+ stub stubServer
+ config Config
+}
+
+func (s *stubServer) start() *http.Client {
+ // Set up a config.Client that forwards all requests to s.mux
+ // via s.srv. Test cases will attach handlers to s.mux to get
+ // the desired responses.
+ s.mux = http.NewServeMux()
+ s.srv = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ s.mutex.Lock()
+ s.Requests = append(s.Requests, *r)
+ s.mutex.Unlock()
+ w.Header().Set("Content-Type", "application/json")
+ s.mux.ServeHTTP(w, r)
+ }))
+ return &http.Client{Transport: s}
+}
+
+func (s *stubServer) RoundTrip(req *http.Request) (*http.Response, error) {
+ w := httptest.NewRecorder()
+ s.mux.ServeHTTP(w, req)
+ return &http.Response{
+ StatusCode: w.Code,
+ Status: fmt.Sprintf("%d %s", w.Code, http.StatusText(w.Code)),
+ Header: w.HeaderMap,
+ Body: ioutil.NopCloser(w.Body)}, nil
+}
+
+func (s *stubServer) stop() {
+ s.srv.Close()
+}
+
+func (s *stubServer) serveStatic(path, data string) *int {
+ var count int
+ s.mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
+ count++
+ fmt.Fprintln(w, data)
+ })
+ return &count
+}
+
+func (s *stubServer) serveCurrentUserAdmin() *int {
+ return s.serveStatic("/arvados/v1/users/current",
+ `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":true,"is_active":true}`)
+}
+
+func (s *stubServer) serveCurrentUserNotAdmin() *int {
+ return s.serveStatic("/arvados/v1/users/current",
+ `{"uuid":"zzzzz-tpzed-000000000000000","is_admin":false,"is_active":true}`)
+}
+
+func (s *stubServer) serveDiscoveryDoc() *int {
+ return s.serveStatic("/discovery/v1/apis/arvados/v1/rest",
+ `{"default_collection_replication":2}`)
+}
+
+func (s *stubServer) serveZeroCollections() *int {
+ return s.serveStatic("/arvados/v1/collections",
+ `{"items":[],"items_available":0}`)
+}
+
+func (s *stubServer) serveZeroKeepServices() *int {
+ return s.serveStatic("/arvados/v1/keep_services",
+ `{"items":[],"items_available":0}`)
+}
+
+func (s *stubServer) serveKeepstoreTrash() *int {
+ return s.serveStatic("/trash", `{}`)
+}
+
+func (s *stubServer) serveKeepstorePull() *int {
+ return s.serveStatic("/pull", `{}`)
+}
+
+func (s *mainSuite) SetUpTest(c *check.C) {
+ s.config = Config{Client: arvados.Client{
+ AuthToken: "xyzzy",
+ APIHost: "zzzzz.arvadosapi.com",
+ Client: s.stub.start()}}
+}
+
+func (s *mainSuite) TearDownTest(c *check.C) {
+ s.stub.stop()
+}
+
+func (s *mainSuite) TestRefuseZeroCollections(c *check.C) {
+ opts := RunOptions{
+ CommitPulls: true,
+ CommitTrash: true,
+ }
+ s.stub.serveDiscoveryDoc()
+ s.stub.serveCurrentUserAdmin()
+ countCollectionsList := s.stub.serveZeroCollections()
+ s.stub.serveZeroKeepServices()
+ countTrash := s.stub.serveKeepstoreTrash()
+ err := Run(s.config, opts)
+ c.Check(err, check.ErrorMatches, "received zero collections")
+ c.Check(*countCollectionsList, check.Equals, 2)
+ c.Check(*countTrash, check.Equals, 0)
+}
+
+func (s *mainSuite) TestRefuseNonAdmin(c *check.C) {
+ opts := RunOptions{
+ CommitPulls: true,
+ CommitTrash: true,
+ }
+ s.stub.serveDiscoveryDoc()
+ countCurrentUser := s.stub.serveCurrentUserNotAdmin()
+ countColl := s.stub.serveZeroCollections()
+ countKS := s.stub.serveZeroKeepServices()
+ countTrash := s.stub.serveKeepstoreTrash()
+ err := Run(s.config, opts)
+ c.Check(err, check.ErrorMatches, "not admin")
+ c.Check(*countCurrentUser, check.Equals, 1)
+ c.Check(*countColl, check.Equals, 0)
+}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list