[ARVADOS] created: 2.1.0-2170-g22e26e694
Git user
git at public.arvados.org
Mon Apr 11 14:01:16 UTC 2022
at 22e26e6943d38167ec1e0c5ec7c79202ae4d3a41 (commit)
commit 22e26e6943d38167ec1e0c5ec7c79202ae4d3a41
Author: Tom Clegg <tom at curii.com>
Date: Mon Apr 4 10:57:24 2022 -0400
18699: Print booted clusterID and controller host:port on stdout.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/boot/cmd.go b/lib/boot/cmd.go
index 6a32ab142..15af548e9 100644
--- a/lib/boot/cmd.go
+++ b/lib/boot/cmd.go
@@ -10,6 +10,7 @@ import (
"flag"
"fmt"
"io"
+ "sort"
"time"
"git.arvados.org/arvados.git/lib/cmd"
@@ -100,15 +101,29 @@ func (bcmd bootCommand) run(ctx context.Context, prog string, args []string, std
} else if !ok {
super.logger.Error("boot failed")
} else {
- // Write each cluster's controller URL to stdout.
- // Nothing else goes to stdout, so this allows a
- // calling script to determine when the cluster is
- // ready to use, and the controller's host:port (which
- // may have been dynamically assigned depending on
- // config/options).
- for _, cc := range super.Clusters() {
- fmt.Fprintln(stdout, cc.Services.Controller.ExternalURL)
+ // Write each cluster's controller URL, id, and URL
+ // host:port to stdout. Nothing else goes to stdout,
+ // so this allows a calling script to determine when
+ // the cluster is ready to use, and the controller's
+ // host:port (which may have been dynamically assigned
+ // depending on config/options).
+ //
+ // Sort output by cluster ID for convenience.
+ var ids []string
+ for id := range super.Clusters() {
+ ids = append(ids, id)
}
+ sort.Strings(ids)
+ for _, id := range ids {
+ cc := super.Cluster(id)
+ // Providing both scheme://host:port and
+ // host:port is redundant, but convenient.
+ fmt.Fprintln(stdout, cc.Services.Controller.ExternalURL, id, cc.Services.Controller.ExternalURL.Host)
+ }
+ // Write ".\n" to mark the end of the list of
+ // controllers, in case the caller doesn't already
+ // know how many clusters are coming up.
+ fmt.Fprintln(stdout, ".")
if *shutdown {
super.Stop()
// Wait for children to exit. Don't report the
commit 03ba71643b9ef0c4bad1919b7fc9553874b2a672
Author: Tom Clegg <tom at curii.com>
Date: Mon Apr 4 10:34:46 2022 -0400
18699: Remove unneeded OIDC config.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/tools/sync-groups/federation_test.go b/tools/sync-groups/federation_test.go
index 71c119156..330c22be3 100644
--- a/tools/sync-groups/federation_test.go
+++ b/tools/sync-groups/federation_test.go
@@ -12,7 +12,6 @@ import (
"git.arvados.org/arvados.git/lib/boot"
"git.arvados.org/arvados.git/sdk/go/arvados"
- "git.arvados.org/arvados.git/sdk/go/arvadostest"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
check "gopkg.in/check.v1"
)
@@ -22,21 +21,13 @@ var _ = check.Suite(&FederationSuite{})
var origAPIHost, origAPIToken string
type FederationSuite struct {
- super *boot.Supervisor
- oidcprovider *arvadostest.OIDCProvider
+ super *boot.Supervisor
}
func (s *FederationSuite) SetUpSuite(c *check.C) {
origAPIHost = os.Getenv("ARVADOS_API_HOST")
origAPIToken = os.Getenv("ARVADOS_API_TOKEN")
- s.oidcprovider = arvadostest.NewOIDCProvider(c)
- s.oidcprovider.AuthEmail = "user at example.com"
- s.oidcprovider.AuthEmailVerified = true
- s.oidcprovider.AuthName = "Example User"
- s.oidcprovider.ValidClientID = "clientid"
- s.oidcprovider.ValidClientSecret = "clientsecret"
-
hostport := map[string]string{}
for _, id := range []string{"z1111", "z2222"} {
hostport[id] = func() string {
@@ -84,13 +75,8 @@ func (s *FederationSuite) SetUpSuite(c *check.C) {
yaml += `
Login:
LoginCluster: z1111
- OpenIDConnect:
+ PAM:
Enable: true
- Issuer: ` + s.oidcprovider.Issuer.URL + `
- ClientID: ` + s.oidcprovider.ValidClientID + `
- ClientSecret: ` + s.oidcprovider.ValidClientSecret + `
- EmailClaim: email
- EmailVerifiedClaim: email_verified
`
} else {
yaml += `
@@ -118,7 +104,7 @@ func (s *FederationSuite) SetUpSuite(c *check.C) {
// Activate user, make it admin.
conn1 := s.super.Conn("z1111")
rootctx1, _, _ := s.super.RootClients("z1111")
- userctx1, _, _, _ := s.super.UserClients("z1111", rootctx1, c, conn1, s.oidcprovider.AuthEmail, true)
+ userctx1, _, _, _ := s.super.UserClients("z1111", rootctx1, c, conn1, "admin at example.com", true)
user1, err := conn1.UserGetCurrent(userctx1, arvados.GetOptions{})
c.Assert(err, check.IsNil)
c.Assert(user1.IsAdmin, check.Equals, false)
@@ -142,7 +128,7 @@ func (s *FederationSuite) TestGroupSyncingOnFederatedCluster(c *check.C) {
// Get admin user's V2 token
conn1 := s.super.Conn("z1111")
rootctx1, _, _ := s.super.RootClients("z1111")
- userctx1, _, _, _ := s.super.UserClients("z1111", rootctx1, c, conn1, s.oidcprovider.AuthEmail, true)
+ userctx1, _, _, _ := s.super.UserClients("z1111", rootctx1, c, conn1, "admin at example.com", true)
user1Auth, err := conn1.APIClientAuthorizationCurrent(userctx1, arvados.GetOptions{})
c.Check(err, check.IsNil)
userV2Token := user1Auth.TokenV2()
commit d44ae11e9ca2bc0ba8cad485a4d5491de625656a
Author: Tom Clegg <tom at curii.com>
Date: Mon Apr 4 10:18:07 2022 -0400
18699: boot.Supervisor support for multi-cluster config file.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/boot/cert.go b/lib/boot/cert.go
index 2b38dab05..916f9f53b 100644
--- a/lib/boot/cert.go
+++ b/lib/boot/cert.go
@@ -26,20 +26,8 @@ func (createCertificates) String() string {
}
func (createCertificates) Run(ctx context.Context, fail func(error), super *Supervisor) error {
- var san string
- if net.ParseIP(super.ListenHost) != nil {
- san += fmt.Sprintf(",IP:%s", super.ListenHost)
- } else {
- san += fmt.Sprintf(",DNS:%s", super.ListenHost)
- }
- hostname, err := os.Hostname()
- if err != nil {
- return fmt.Errorf("hostname: %w", err)
- }
- san += ",DNS:" + hostname
-
// Generate root key
- err = super.RunProgram(ctx, super.tempdir, runOptions{}, "openssl", "genrsa", "-out", "rootCA.key", "4096")
+ err := super.RunProgram(ctx, super.tempdir, runOptions{}, "openssl", "genrsa", "-out", "rootCA.key", "4096")
if err != nil {
return err
}
@@ -58,7 +46,20 @@ func (createCertificates) Run(ctx context.Context, fail func(error), super *Supe
if err != nil {
return err
}
- err = ioutil.WriteFile(filepath.Join(super.tempdir, "server.cfg"), append(defaultconf, []byte(fmt.Sprintf("\n[SAN]\nsubjectAltName=DNS:localhost,DNS:localhost.localdomain%s\n", san))...), 0644)
+ hostname, err := os.Hostname()
+ if err != nil {
+ return fmt.Errorf("hostname: %w", err)
+ }
+ san := "DNS:localhost,DNS:localhost.localdomain,DNS:" + hostname
+ if super.ListenHost == hostname || super.ListenHost == "localhost" {
+ // already have it
+ } else if net.ParseIP(super.ListenHost) != nil {
+ san += fmt.Sprintf(",IP:%s", super.ListenHost)
+ } else {
+ san += fmt.Sprintf(",DNS:%s", super.ListenHost)
+ }
+ conf := append(defaultconf, []byte(fmt.Sprintf("\n[SAN]\nsubjectAltName=%s\n", san))...)
+ err = ioutil.WriteFile(filepath.Join(super.tempdir, "server.cfg"), conf, 0644)
if err != nil {
return err
}
diff --git a/lib/boot/cmd.go b/lib/boot/cmd.go
index 9c3047a7b..6a32ab142 100644
--- a/lib/boot/cmd.go
+++ b/lib/boot/cmd.go
@@ -13,7 +13,6 @@ import (
"time"
"git.arvados.org/arvados.git/lib/cmd"
- "git.arvados.org/arvados.git/lib/config"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
)
@@ -56,17 +55,17 @@ func (bcmd bootCommand) run(ctx context.Context, prog string, args []string, std
ctx, cancel := context.WithCancel(ctx)
defer cancel()
super := &Supervisor{
+ Stdin: stdin,
Stderr: stderr,
logger: ctxlog.FromContext(ctx),
}
flags := flag.NewFlagSet(prog, flag.ContinueOnError)
- loader := config.NewLoader(stdin, super.logger)
- loader.SetupFlags(flags)
versionFlag := flags.Bool("version", false, "Write version information to stdout and exit 0")
+ flags.StringVar(&super.ConfigPath, "config", "/etc/arvados/config.yml", "arvados config file `path`")
flags.StringVar(&super.SourcePath, "source", ".", "arvados source tree `directory`")
flags.StringVar(&super.ClusterType, "type", "production", "cluster `type`: development, test, or production")
- flags.StringVar(&super.ListenHost, "listen-host", "localhost", "host name or interface address for service listeners")
+ flags.StringVar(&super.ListenHost, "listen-host", "localhost", "host name or interface address for external services, and internal services whose InternalURLs are not configured")
flags.StringVar(&super.ControllerAddr, "controller-address", ":0", "desired controller address, `host:port` or `:port`")
flags.StringVar(&super.Workbench2Source, "workbench2-source", "../arvados-workbench2", "path to arvados-workbench2 source tree")
flags.BoolVar(&super.NoWorkbench1, "no-workbench1", false, "do not run workbench1")
@@ -87,13 +86,7 @@ func (bcmd bootCommand) run(ctx context.Context, prog string, args []string, std
return fmt.Errorf("cluster type must be 'development', 'test', or 'production'")
}
- loader.SkipAPICalls = true
- cfg, err := loader.Load()
- if err != nil {
- return err
- }
-
- super.Start(ctx, cfg, loader.Path)
+ super.Start(ctx)
defer super.Stop()
var timer *time.Timer
@@ -101,17 +94,21 @@ func (bcmd bootCommand) run(ctx context.Context, prog string, args []string, std
timer = time.AfterFunc(*timeout, super.Stop)
}
- url, ok := super.WaitReady()
+ ok := super.WaitReady()
if timer != nil && !timer.Stop() {
return errors.New("boot timed out")
} else if !ok {
super.logger.Error("boot failed")
} else {
- // Write controller URL to stdout. Nothing else goes
- // to stdout, so this provides an easy way for a
- // calling script to discover the controller URL when
- // everything is ready.
- fmt.Fprintln(stdout, url)
+ // Write each cluster's controller URL to stdout.
+ // Nothing else goes to stdout, so this allows a
+ // calling script to determine when the cluster is
+ // ready to use, and the controller's host:port (which
+ // may have been dynamically assigned depending on
+ // config/options).
+ for _, cc := range super.Clusters() {
+ fmt.Fprintln(stdout, cc.Services.Controller.ExternalURL)
+ }
if *shutdown {
super.Stop()
// Wait for children to exit. Don't report the
diff --git a/lib/boot/helpers.go b/lib/boot/helpers.go
index 731fc56c8..77036e934 100644
--- a/lib/boot/helpers.go
+++ b/lib/boot/helpers.go
@@ -9,73 +9,24 @@ import (
"net/url"
"git.arvados.org/arvados.git/lib/controller/rpc"
- "git.arvados.org/arvados.git/lib/service"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
"git.arvados.org/arvados.git/sdk/go/auth"
- "git.arvados.org/arvados.git/sdk/go/ctxlog"
"git.arvados.org/arvados.git/sdk/go/keepclient"
"gopkg.in/check.v1"
)
-// TestCluster stores a working test cluster data
-type TestCluster struct {
- Super Supervisor
- Config arvados.Config
- ControllerURL *url.URL
- ClusterID string
-}
-
-type logger struct {
- loggerfunc func(...interface{})
-}
-
-func (l logger) Log(args ...interface{}) {
- l.loggerfunc(args)
-}
-
-// NewTestCluster loads the provided configuration, and sets up a test cluster
-// ready for being started.
-func NewTestCluster(srcPath, clusterID string, cfg *arvados.Config, listenHost string, logWriter func(...interface{})) *TestCluster {
- return &TestCluster{
- Super: Supervisor{
- SourcePath: srcPath,
- ClusterType: "test",
- ListenHost: listenHost,
- ControllerAddr: ":0",
- OwnTemporaryDatabase: true,
- Stderr: &service.LogPrefixer{
- Writer: ctxlog.LogWriter(logWriter),
- Prefix: []byte("[" + clusterID + "] ")},
- },
- Config: *cfg,
- ClusterID: clusterID,
- }
-}
-
-// Start the test cluster.
-func (tc *TestCluster) Start() {
- tc.Super.Start(context.Background(), &tc.Config, "-")
-}
-
-// WaitReady waits for all components to report healthy, and finishes setting
-// up the TestCluster struct.
-func (tc *TestCluster) WaitReady() bool {
- au, ok := tc.Super.WaitReady()
- if !ok {
- return ok
- }
- u := url.URL(*au)
- tc.ControllerURL = &u
- return ok
-}
-
// ClientsWithToken returns Context, Arvados.Client and keepclient structs
// initialized to connect to the cluster with the supplied Arvados token.
-func (tc *TestCluster) ClientsWithToken(token string) (context.Context, *arvados.Client, *keepclient.KeepClient) {
- cl := tc.Config.Clusters[tc.ClusterID]
- ctx := auth.NewContext(context.Background(), auth.NewCredentials(token))
- ac, err := arvados.NewClientFromConfig(&cl)
+func (super *Supervisor) ClientsWithToken(clusterID, token string) (context.Context, *arvados.Client, *keepclient.KeepClient) {
+ cl := super.cluster
+ if super.children != nil {
+ cl = super.children[clusterID].cluster
+ } else if clusterID != cl.ClusterID {
+ panic("bad clusterID " + clusterID)
+ }
+ ctx := auth.NewContext(super.ctx, auth.NewCredentials(token))
+ ac, err := arvados.NewClientFromConfig(cl)
if err != nil {
panic(err)
}
@@ -92,7 +43,7 @@ func (tc *TestCluster) ClientsWithToken(token string) (context.Context, *arvados
// initialize clients with the API token, set up the user and
// optionally activate the user. Return client structs for
// communicating with the cluster on behalf of the 'example' user.
-func (tc *TestCluster) UserClients(rootctx context.Context, c *check.C, conn *rpc.Conn, authEmail string, activate bool) (context.Context, *arvados.Client, *keepclient.KeepClient, arvados.User) {
+func (super *Supervisor) UserClients(clusterID string, rootctx context.Context, c *check.C, conn *rpc.Conn, authEmail string, activate bool) (context.Context, *arvados.Client, *keepclient.KeepClient, arvados.User) {
login, err := conn.UserSessionCreate(rootctx, rpc.UserSessionCreateOptions{
ReturnTo: ",https://example.com",
AuthInfo: rpc.UserSessionAuthInfo{
@@ -107,7 +58,7 @@ func (tc *TestCluster) UserClients(rootctx context.Context, c *check.C, conn *rp
c.Assert(err, check.IsNil)
userToken := redirURL.Query().Get("api_token")
c.Logf("user token: %q", userToken)
- ctx, ac, kc := tc.ClientsWithToken(userToken)
+ ctx, ac, kc := super.ClientsWithToken(clusterID, userToken)
user, err := conn.UserGetCurrent(ctx, arvados.GetOptions{})
c.Assert(err, check.IsNil)
_, err = conn.UserSetup(rootctx, arvados.UserSetupOptions{UUID: user.UUID})
@@ -127,18 +78,19 @@ func (tc *TestCluster) UserClients(rootctx context.Context, c *check.C, conn *rp
// RootClients returns Context, arvados.Client and keepclient structs initialized
// to communicate with the cluster as the system root user.
-func (tc *TestCluster) RootClients() (context.Context, *arvados.Client, *keepclient.KeepClient) {
- return tc.ClientsWithToken(tc.Config.Clusters[tc.ClusterID].SystemRootToken)
+func (super *Supervisor) RootClients(clusterID string) (context.Context, *arvados.Client, *keepclient.KeepClient) {
+ return super.ClientsWithToken(clusterID, super.Cluster(clusterID).SystemRootToken)
}
// AnonymousClients returns Context, arvados.Client and keepclient structs initialized
// to communicate with the cluster as the anonymous user.
-func (tc *TestCluster) AnonymousClients() (context.Context, *arvados.Client, *keepclient.KeepClient) {
- return tc.ClientsWithToken(tc.Config.Clusters[tc.ClusterID].Users.AnonymousUserToken)
+func (super *Supervisor) AnonymousClients(clusterID string) (context.Context, *arvados.Client, *keepclient.KeepClient) {
+ return super.ClientsWithToken(clusterID, super.Cluster(clusterID).Users.AnonymousUserToken)
}
// Conn gets rpc connection struct initialized to communicate with the
// specified cluster.
-func (tc *TestCluster) Conn() *rpc.Conn {
- return rpc.NewConn(tc.ClusterID, tc.ControllerURL, true, rpc.PassthroughTokenProvider)
+func (super *Supervisor) Conn(clusterID string) *rpc.Conn {
+ controllerURL := url.URL(super.Cluster(clusterID).Services.Controller.ExternalURL)
+ return rpc.NewConn(clusterID, &controllerURL, true, rpc.PassthroughTokenProvider)
}
diff --git a/lib/boot/supervisor.go b/lib/boot/supervisor.go
index 323f67234..9a1cc50c5 100644
--- a/lib/boot/supervisor.go
+++ b/lib/boot/supervisor.go
@@ -37,25 +37,47 @@ import (
)
type Supervisor struct {
- SourcePath string // e.g., /home/username/src/arvados
- SourceVersion string // e.g., acbd1324...
- ClusterType string // e.g., production
- ListenHost string // e.g., localhost
- ControllerAddr string // e.g., 127.0.0.1:8000
- Workbench2Source string // e.g., /home/username/src/arvados-workbench2
+ // Config file location like "/etc/arvados/config.yml", or "-"
+ // to read from Stdin (see below).
+ ConfigPath string
+ // Literal config file (useful for test suites). If non-empty,
+ // this is used instead of ConfigPath.
+ ConfigYAML string
+ // Path to arvados source tree. Only used for dev/test
+ // clusters.
+ SourcePath string
+ // Version number to build into binaries. Only used for
+ // dev/test clusters.
+ SourceVersion string
+ // "production", "development", or "test".
+ ClusterType string
+ // Listening address for external services, and internal
+ // services whose InternalURLs are not explicitly configured.
+ // If blank, listen on the configured controller ExternalURL
+ // host; if that is also blank, listen on all addresses
+ // (0.0.0.0).
+ ListenHost string
+ // Default host:port for controller ExternalURL if not
+ // explicitly configured in config file. If blank, use a
+ // random port on ListenHost.
+ ControllerAddr string
+ // Path to arvados-workbench2 source tree checkout.
+ Workbench2Source string
NoWorkbench1 bool
NoWorkbench2 bool
OwnTemporaryDatabase bool
+ Stdin io.Reader
Stderr io.Writer
- logger logrus.FieldLogger
- cluster *arvados.Cluster
+ logger logrus.FieldLogger
+ cluster *arvados.Cluster // nil if this is a multi-cluster supervisor
+ children map[string]*Supervisor // nil if this is a single-cluster supervisor
ctx context.Context
cancel context.CancelFunc
- done chan struct{} // closed when child procs/services have shut down
- err error // error that caused shutdown (valid when done is closed)
- healthChecker *health.Aggregator
+ done chan struct{} // closed when child procs/services have shut down
+ err error // error that caused shutdown (valid when done is closed)
+ healthChecker *health.Aggregator // nil if this is a multi-cluster supervisor, or still booting
tasksReady map[string]chan bool
waitShutdown sync.WaitGroup
@@ -66,73 +88,161 @@ type Supervisor struct {
environ []string // for child processes
}
-func (super *Supervisor) Cluster() *arvados.Cluster { return super.cluster }
+func (super *Supervisor) Clusters() map[string]*arvados.Cluster {
+ m := map[string]*arvados.Cluster{}
+ if super.cluster != nil {
+ m[super.cluster.ClusterID] = super.cluster
+ }
+ for id, super2 := range super.children {
+ m[id] = super2.Cluster("")
+ }
+ return m
+}
+
+func (super *Supervisor) Cluster(id string) *arvados.Cluster {
+ if super.children != nil {
+ return super.children[id].Cluster(id)
+ } else {
+ return super.cluster
+ }
+}
-func (super *Supervisor) Start(ctx context.Context, cfg *arvados.Config, cfgPath string) {
+func (super *Supervisor) Start(ctx context.Context) {
+ super.logger = ctxlog.FromContext(ctx)
super.ctx, super.cancel = context.WithCancel(ctx)
super.done = make(chan struct{})
+ sigch := make(chan os.Signal)
+ signal.Notify(sigch, syscall.SIGINT, syscall.SIGTERM)
+ defer signal.Stop(sigch)
go func() {
- defer close(super.done)
+ for sig := range sigch {
+ super.logger.WithField("signal", sig).Info("caught signal")
+ if super.err == nil {
+ super.err = fmt.Errorf("caught signal %s", sig)
+ }
+ super.cancel()
+ }
+ }()
- sigch := make(chan os.Signal)
- signal.Notify(sigch, syscall.SIGINT, syscall.SIGTERM)
- defer signal.Stop(sigch)
- go func() {
- for sig := range sigch {
- super.logger.WithField("signal", sig).Info("caught signal")
- if super.err == nil {
- super.err = fmt.Errorf("caught signal %s", sig)
- }
- super.cancel()
+ hupch := make(chan os.Signal)
+ signal.Notify(hupch, syscall.SIGHUP)
+ defer signal.Stop(hupch)
+ go func() {
+ for sig := range hupch {
+ super.logger.WithField("signal", sig).Info("caught signal")
+ if super.err == nil {
+ super.err = errNeedConfigReload
}
- }()
+ super.cancel()
+ }
+ }()
+
+ loaderStdin := super.Stdin
+ if super.ConfigYAML != "" {
+ loaderStdin = bytes.NewBufferString(super.ConfigYAML)
+ }
+ loader := config.NewLoader(loaderStdin, super.logger)
+ loader.SkipLegacy = true
+ loader.SkipAPICalls = true
+ loader.Path = super.ConfigPath
+ if super.ConfigYAML != "" {
+ loader.Path = "-"
+ }
+ cfg, err := loader.Load()
+ if err != nil {
+ super.err = err
+ close(super.done)
+ super.cancel()
+ return
+ }
+
+ if super.ConfigPath != "" && super.ConfigPath != "-" && cfg.AutoReloadConfig {
+ go watchConfig(super.ctx, super.logger, super.ConfigPath, copyConfig(cfg), func() {
+ if super.err == nil {
+ super.err = errNeedConfigReload
+ }
+ super.cancel()
+ })
+ }
- hupch := make(chan os.Signal)
- signal.Notify(hupch, syscall.SIGHUP)
- defer signal.Stop(hupch)
+ if len(cfg.Clusters) > 1 {
+ super.startFederation(cfg)
go func() {
- for sig := range hupch {
- super.logger.WithField("signal", sig).Info("caught signal")
+ defer super.cancel()
+ defer close(super.done)
+ for _, super2 := range super.children {
+ err := super2.Wait()
if super.err == nil {
- super.err = errNeedConfigReload
+ super.err = err
}
- super.cancel()
}
}()
-
- if cfgPath != "" && cfgPath != "-" && cfg.AutoReloadConfig {
- go watchConfig(super.ctx, super.logger, cfgPath, copyConfig(cfg), func() {
+ } else {
+ go func() {
+ defer super.cancel()
+ defer close(super.done)
+ super.cluster, super.err = cfg.GetCluster("")
+ if super.err != nil {
+ return
+ }
+ err := super.runCluster()
+ if err != nil {
+ super.logger.WithError(err).Info("supervisor shut down")
if super.err == nil {
- super.err = errNeedConfigReload
+ super.err = err
}
- super.cancel()
- })
- }
-
- err := super.run(cfg)
- if err != nil {
- super.logger.WithError(err).Warn("supervisor shut down")
- if super.err == nil {
- super.err = err
}
- }
- }()
+ }()
+ }
}
+// Wait returns when all child processes and goroutines have exited.
func (super *Supervisor) Wait() error {
<-super.done
return super.err
}
-func (super *Supervisor) run(cfg *arvados.Config) error {
- defer super.cancel()
+// startFederation starts a child Supervisor for each cluster in the
+// given config. Each is a copy of the original/parent with the
+// original config reduced to a single cluster.
+func (super *Supervisor) startFederation(cfg *arvados.Config) {
+ super.children = map[string]*Supervisor{}
+ for id, cc := range cfg.Clusters {
+ super2 := *super
+ yaml, err := json.Marshal(arvados.Config{Clusters: map[string]arvados.Cluster{id: cc}})
+ if err != nil {
+ panic(fmt.Sprintf("json.Marshal partial config: %s", err))
+ }
+ super2.ConfigYAML = string(yaml)
+ super2.ConfigPath = "-"
+ super2.children = nil
+
+ if super2.ClusterType == "test" {
+ super2.Stderr = &service.LogPrefixer{
+ Writer: super.Stderr,
+ Prefix: []byte("[" + id + "] "),
+ }
+ }
+ super2.Start(super.ctx)
+ super.children[id] = &super2
+ }
+}
+func (super *Supervisor) runCluster() error {
cwd, err := os.Getwd()
if err != nil {
return err
}
- if !strings.HasPrefix(super.SourcePath, "/") {
+ if super.ClusterType == "test" && super.SourcePath == "" {
+ // When invoked by test suite, default to current
+ // source tree
+ buf, err := exec.Command("git", "rev-parse", "--show-toplevel").CombinedOutput()
+ if err != nil {
+ return fmt.Errorf("git rev-parse: %w", err)
+ }
+ super.SourcePath = strings.TrimSuffix(string(buf), "\n")
+ } else if !strings.HasPrefix(super.SourcePath, "/") {
super.SourcePath = filepath.Join(cwd, super.SourcePath)
}
super.SourcePath, err = filepath.EvalSymlinks(super.SourcePath)
@@ -140,6 +250,18 @@ func (super *Supervisor) run(cfg *arvados.Config) error {
return err
}
+ if super.ListenHost == "" {
+ if urlhost := super.cluster.Services.Controller.ExternalURL.Host; urlhost != "" {
+ if h, _, _ := net.SplitHostPort(urlhost); h != "" {
+ super.ListenHost = h
+ } else {
+ super.ListenHost = urlhost
+ }
+ } else {
+ super.ListenHost = "0.0.0.0"
+ }
+ }
+
// Choose bin and temp dirs: /var/lib/arvados/... in
// production, transient tempdir otherwise.
if super.ClusterType == "production" {
@@ -164,7 +286,7 @@ func (super *Supervisor) run(cfg *arvados.Config) error {
// Fill in any missing config keys, and write the resulting
// config in the temp dir for child services to use.
- err = super.autofillConfig(cfg)
+ err = super.autofillConfig()
if err != nil {
return err
}
@@ -173,7 +295,9 @@ func (super *Supervisor) run(cfg *arvados.Config) error {
return err
}
defer conffile.Close()
- err = json.NewEncoder(conffile).Encode(cfg)
+ err = json.NewEncoder(conffile).Encode(arvados.Config{
+ Clusters: map[string]arvados.Cluster{
+ super.cluster.ClusterID: *super.cluster}})
if err != nil {
return err
}
@@ -193,10 +317,6 @@ func (super *Supervisor) run(cfg *arvados.Config) error {
super.prependEnv("PATH", super.tempdir+"/bin:")
}
- super.cluster, err = cfg.GetCluster("")
- if err != nil {
- return err
- }
// Now that we have the config, replace the bootstrap logger
// with a new one according to the logging config.
loglevel := super.cluster.SystemLogs.LogLevel
@@ -307,36 +427,60 @@ func (super *Supervisor) run(cfg *arvados.Config) error {
}
func (super *Supervisor) wait(ctx context.Context, tasks ...supervisedTask) error {
+ ticker := time.NewTicker(15 * time.Second)
+ defer ticker.Stop()
for _, task := range tasks {
ch, ok := super.tasksReady[task.String()]
if !ok {
return fmt.Errorf("no such task: %s", task)
}
super.logger.WithField("task", task.String()).Info("waiting")
- select {
- case <-ch:
- super.logger.WithField("task", task.String()).Info("ready")
- case <-ctx.Done():
- super.logger.WithField("task", task.String()).Info("task was never ready")
- return ctx.Err()
+ for {
+ select {
+ case <-ch:
+ super.logger.WithField("task", task.String()).Info("ready")
+ case <-ctx.Done():
+ super.logger.WithField("task", task.String()).Info("task was never ready")
+ return ctx.Err()
+ case <-ticker.C:
+ super.logger.WithField("task", task.String()).Info("still waiting...")
+ continue
+ }
+ break
}
}
return nil
}
+// Stop shuts down all child processes and goroutines, and returns
+// when all of them have exited.
func (super *Supervisor) Stop() {
super.cancel()
<-super.done
}
-func (super *Supervisor) WaitReady() (*arvados.URL, bool) {
+// WaitReady waits for the cluster(s) to be ready to handle requests,
+// then returns true. If startup fails, it returns false.
+func (super *Supervisor) WaitReady() bool {
+ if super.children != nil {
+ for id, super2 := range super.children {
+ super.logger.Infof("waiting for %s to be ready", id)
+ if !super2.WaitReady() {
+ super.logger.Infof("%s startup failed", id)
+ return false
+ }
+ super.logger.Infof("%s is ready", id)
+ }
+ super.logger.Info("all clusters are ready")
+ return true
+ }
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for waiting := "all"; waiting != ""; {
select {
case <-ticker.C:
case <-super.ctx.Done():
- return nil, false
+ return false
}
if super.healthChecker == nil {
// not set up yet
@@ -358,8 +502,7 @@ func (super *Supervisor) WaitReady() (*arvados.URL, bool) {
super.logger.WithField("targets", waiting[1:]).Info("waiting")
}
}
- u := super.cluster.Services.Controller.ExternalURL
- return &u, true
+ return true
}
func (super *Supervisor) prependEnv(key, prepend string) {
@@ -631,11 +774,7 @@ func (super *Supervisor) RunProgram(ctx context.Context, dir string, opts runOpt
return nil
}
-func (super *Supervisor) autofillConfig(cfg *arvados.Config) error {
- cluster, err := cfg.GetCluster("")
- if err != nil {
- return err
- }
+func (super *Supervisor) autofillConfig() error {
usedPort := map[string]bool{}
nextPort := func(host string) (string, error) {
for {
@@ -653,39 +792,39 @@ func (super *Supervisor) autofillConfig(cfg *arvados.Config) error {
return port, nil
}
}
- if cluster.Services.Controller.ExternalURL.Host == "" {
+ if super.cluster.Services.Controller.ExternalURL.Host == "" {
h, p, err := net.SplitHostPort(super.ControllerAddr)
- if err != nil {
- return fmt.Errorf("SplitHostPort(ControllerAddr): %w", err)
+ if err != nil && super.ControllerAddr != "" {
+ return fmt.Errorf("SplitHostPort(ControllerAddr %q): %w", super.ControllerAddr, err)
}
if h == "" {
h = super.ListenHost
}
- if p == "0" {
+ if p == "0" || p == "" {
p, err = nextPort(h)
if err != nil {
return err
}
}
- cluster.Services.Controller.ExternalURL = arvados.URL{Scheme: "https", Host: net.JoinHostPort(h, p), Path: "/"}
+ super.cluster.Services.Controller.ExternalURL = arvados.URL{Scheme: "https", Host: net.JoinHostPort(h, p), Path: "/"}
}
- u := url.URL(cluster.Services.Controller.ExternalURL)
+ u := url.URL(super.cluster.Services.Controller.ExternalURL)
defaultExtHost := u.Hostname()
for _, svc := range []*arvados.Service{
- &cluster.Services.Controller,
- &cluster.Services.DispatchCloud,
- &cluster.Services.GitHTTP,
- &cluster.Services.Health,
- &cluster.Services.Keepproxy,
- &cluster.Services.Keepstore,
- &cluster.Services.RailsAPI,
- &cluster.Services.WebDAV,
- &cluster.Services.WebDAVDownload,
- &cluster.Services.Websocket,
- &cluster.Services.Workbench1,
- &cluster.Services.Workbench2,
+ &super.cluster.Services.Controller,
+ &super.cluster.Services.DispatchCloud,
+ &super.cluster.Services.GitHTTP,
+ &super.cluster.Services.Health,
+ &super.cluster.Services.Keepproxy,
+ &super.cluster.Services.Keepstore,
+ &super.cluster.Services.RailsAPI,
+ &super.cluster.Services.WebDAV,
+ &super.cluster.Services.WebDAVDownload,
+ &super.cluster.Services.Websocket,
+ &super.cluster.Services.Workbench1,
+ &super.cluster.Services.Workbench2,
} {
- if svc == &cluster.Services.DispatchCloud && super.ClusterType == "test" {
+ if svc == &super.cluster.Services.DispatchCloud && super.ClusterType == "test" {
continue
}
if svc.ExternalURL.Host == "" {
@@ -694,21 +833,21 @@ func (super *Supervisor) autofillConfig(cfg *arvados.Config) error {
return err
}
host := net.JoinHostPort(defaultExtHost, port)
- if svc == &cluster.Services.Controller ||
- svc == &cluster.Services.GitHTTP ||
- svc == &cluster.Services.Health ||
- svc == &cluster.Services.Keepproxy ||
- svc == &cluster.Services.WebDAV ||
- svc == &cluster.Services.WebDAVDownload ||
- svc == &cluster.Services.Workbench1 ||
- svc == &cluster.Services.Workbench2 {
+ if svc == &super.cluster.Services.Controller ||
+ svc == &super.cluster.Services.GitHTTP ||
+ svc == &super.cluster.Services.Health ||
+ svc == &super.cluster.Services.Keepproxy ||
+ svc == &super.cluster.Services.WebDAV ||
+ svc == &super.cluster.Services.WebDAVDownload ||
+ svc == &super.cluster.Services.Workbench1 ||
+ svc == &super.cluster.Services.Workbench2 {
svc.ExternalURL = arvados.URL{Scheme: "https", Host: host, Path: "/"}
- } else if svc == &cluster.Services.Websocket {
+ } else if svc == &super.cluster.Services.Websocket {
svc.ExternalURL = arvados.URL{Scheme: "wss", Host: host, Path: "/websocket"}
}
}
- if super.NoWorkbench1 && svc == &cluster.Services.Workbench1 ||
- super.NoWorkbench2 && svc == &cluster.Services.Workbench2 {
+ if super.NoWorkbench1 && svc == &super.cluster.Services.Workbench1 ||
+ super.NoWorkbench2 && svc == &super.cluster.Services.Workbench2 {
// When workbench1 is disabled, it gets an
// ExternalURL (so we have a valid listening
// port to write in our Nginx config) but no
@@ -728,26 +867,26 @@ func (super *Supervisor) autofillConfig(cfg *arvados.Config) error {
}
}
if super.ClusterType != "production" {
- if cluster.SystemRootToken == "" {
- cluster.SystemRootToken = randomHexString(64)
+ if super.cluster.SystemRootToken == "" {
+ super.cluster.SystemRootToken = randomHexString(64)
}
- if cluster.ManagementToken == "" {
- cluster.ManagementToken = randomHexString(64)
+ if super.cluster.ManagementToken == "" {
+ super.cluster.ManagementToken = randomHexString(64)
}
- if cluster.Collections.BlobSigningKey == "" {
- cluster.Collections.BlobSigningKey = randomHexString(64)
+ if super.cluster.Collections.BlobSigningKey == "" {
+ super.cluster.Collections.BlobSigningKey = randomHexString(64)
}
- if cluster.Users.AnonymousUserToken == "" {
- cluster.Users.AnonymousUserToken = randomHexString(64)
+ if super.cluster.Users.AnonymousUserToken == "" {
+ super.cluster.Users.AnonymousUserToken = randomHexString(64)
}
- if cluster.Containers.DispatchPrivateKey == "" {
+ if super.cluster.Containers.DispatchPrivateKey == "" {
buf, err := ioutil.ReadFile(filepath.Join(super.SourcePath, "lib", "dispatchcloud", "test", "sshkey_dispatch"))
if err != nil {
return err
}
- cluster.Containers.DispatchPrivateKey = string(buf)
+ super.cluster.Containers.DispatchPrivateKey = string(buf)
}
- cluster.TLS.Insecure = true
+ super.cluster.TLS.Insecure = true
}
if super.ClusterType == "test" {
// Add a second keepstore process.
@@ -756,13 +895,13 @@ func (super *Supervisor) autofillConfig(cfg *arvados.Config) error {
return err
}
host := net.JoinHostPort(super.ListenHost, port)
- cluster.Services.Keepstore.InternalURLs[arvados.URL{Scheme: "http", Host: host, Path: "/"}] = arvados.ServiceInstance{}
+ super.cluster.Services.Keepstore.InternalURLs[arvados.URL{Scheme: "http", Host: host, Path: "/"}] = arvados.ServiceInstance{}
// Create a directory-backed volume for each keepstore
// process.
- cluster.Volumes = map[string]arvados.Volume{}
- for url := range cluster.Services.Keepstore.InternalURLs {
- volnum := len(cluster.Volumes)
+ super.cluster.Volumes = map[string]arvados.Volume{}
+ for url := range super.cluster.Services.Keepstore.InternalURLs {
+ volnum := len(super.cluster.Volumes)
datadir := fmt.Sprintf("%s/keep%d.data", super.tempdir, volnum)
if _, err = os.Stat(datadir + "/."); err == nil {
} else if !os.IsNotExist(err) {
@@ -770,7 +909,7 @@ func (super *Supervisor) autofillConfig(cfg *arvados.Config) error {
} else if err = os.Mkdir(datadir, 0755); err != nil {
return err
}
- cluster.Volumes[fmt.Sprintf(cluster.ClusterID+"-nyw5e-%015d", volnum)] = arvados.Volume{
+ super.cluster.Volumes[fmt.Sprintf(super.cluster.ClusterID+"-nyw5e-%015d", volnum)] = arvados.Volume{
Driver: "Directory",
DriverParameters: json.RawMessage(fmt.Sprintf(`{"Root":%q}`, datadir)),
AccessViaHosts: map[arvados.URL]arvados.VolumeAccess{
@@ -783,7 +922,7 @@ func (super *Supervisor) autofillConfig(cfg *arvados.Config) error {
},
}
}
- cluster.StorageClasses = map[string]arvados.StorageClassConfig{
+ super.cluster.StorageClasses = map[string]arvados.StorageClassConfig{
"default": {Default: true},
"foo": {},
"bar": {},
@@ -794,7 +933,7 @@ func (super *Supervisor) autofillConfig(cfg *arvados.Config) error {
if err != nil {
return err
}
- cluster.PostgreSQL.Connection = arvados.PostgreSQLConnection{
+ super.cluster.PostgreSQL.Connection = arvados.PostgreSQLConnection{
"client_encoding": "utf8",
"host": "localhost",
"port": port,
@@ -803,8 +942,6 @@ func (super *Supervisor) autofillConfig(cfg *arvados.Config) error {
"password": "insecure_arvados_test",
}
}
-
- cfg.Clusters[cluster.ClusterID] = *cluster
return nil
}
@@ -876,6 +1013,7 @@ func availablePort(host string) (string, error) {
// Try to connect to addr until it works, then close ch. Give up if
// ctx cancels.
func waitForConnect(ctx context.Context, addr string) error {
+ ctxlog.FromContext(ctx).WithField("addr", addr).Info("waitForConnect")
dialer := net.Dialer{Timeout: time.Second}
for ctx.Err() == nil {
conn, err := dialer.DialContext(ctx, "tcp", addr)
diff --git a/lib/controller/integration_test.go b/lib/controller/integration_test.go
index 6ffbbd272..a0f0632ba 100644
--- a/lib/controller/integration_test.go
+++ b/lib/controller/integration_test.go
@@ -17,13 +17,12 @@ import (
"net/http"
"os"
"os/exec"
- "path/filepath"
"strconv"
"strings"
"sync"
+ "time"
"git.arvados.org/arvados.git/lib/boot"
- "git.arvados.org/arvados.git/lib/config"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadostest"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
@@ -34,13 +33,11 @@ import (
var _ = check.Suite(&IntegrationSuite{})
type IntegrationSuite struct {
- testClusters map[string]*boot.TestCluster
+ super *boot.Supervisor
oidcprovider *arvadostest.OIDCProvider
}
func (s *IntegrationSuite) SetUpSuite(c *check.C) {
- cwd, _ := os.Getwd()
-
s.oidcprovider = arvadostest.NewOIDCProvider(c)
s.oidcprovider.AuthEmail = "user at example.com"
s.oidcprovider.AuthEmailVerified = true
@@ -48,13 +45,8 @@ func (s *IntegrationSuite) SetUpSuite(c *check.C) {
s.oidcprovider.ValidClientID = "clientid"
s.oidcprovider.ValidClientSecret = "clientsecret"
- s.testClusters = map[string]*boot.TestCluster{
- "z1111": nil,
- "z2222": nil,
- "z3333": nil,
- }
hostport := map[string]string{}
- for id := range s.testClusters {
+ for _, id := range []string{"z1111", "z2222", "z3333"} {
hostport[id] = func() string {
// TODO: Instead of expecting random ports on
// 127.0.0.11, 22, 33 to be race-safe, try
@@ -68,8 +60,9 @@ func (s *IntegrationSuite) SetUpSuite(c *check.C) {
return "127.0.0." + id[3:] + ":" + port
}()
}
- for id := range s.testClusters {
- yaml := `Clusters:
+ yaml := "Clusters:\n"
+ for id := range hostport {
+ yaml += `
` + id + `:
Services:
Controller:
@@ -124,37 +117,35 @@ func (s *IntegrationSuite) SetUpSuite(c *check.C) {
LoginCluster: z1111
`
}
-
- loader := config.NewLoader(bytes.NewBufferString(yaml), ctxlog.TestLogger(c))
- loader.Path = "-"
- loader.SkipLegacy = true
- loader.SkipAPICalls = true
- cfg, err := loader.Load()
- c.Assert(err, check.IsNil)
- tc := boot.NewTestCluster(
- filepath.Join(cwd, "..", ".."),
- id, cfg, "127.0.0."+id[3:], c.Log)
- tc.Super.NoWorkbench1 = true
- tc.Super.NoWorkbench2 = true
- tc.Start()
- s.testClusters[id] = tc
}
- for _, tc := range s.testClusters {
- ok := tc.WaitReady()
- c.Assert(ok, check.Equals, true)
+ s.super = &boot.Supervisor{
+ ClusterType: "test",
+ ConfigYAML: yaml,
+ Stderr: ctxlog.LogWriter(c.Log),
+ NoWorkbench1: true,
+ NoWorkbench2: true,
+ OwnTemporaryDatabase: true,
}
+
+ // Give up if startup takes longer than 3m
+ timeout := time.AfterFunc(3*time.Minute, s.super.Stop)
+ defer timeout.Stop()
+ s.super.Start(context.Background())
+ ok := s.super.WaitReady()
+ c.Assert(ok, check.Equals, true)
}
func (s *IntegrationSuite) TearDownSuite(c *check.C) {
- for _, c := range s.testClusters {
- c.Super.Stop()
+ if s.super != nil {
+ s.super.Stop()
+ s.super.Wait()
}
}
func (s *IntegrationSuite) TestDefaultStorageClassesOnCollections(c *check.C) {
- conn := s.testClusters["z1111"].Conn()
- rootctx, _, _ := s.testClusters["z1111"].RootClients()
- userctx, _, kc, _ := s.testClusters["z1111"].UserClients(rootctx, c, conn, s.oidcprovider.AuthEmail, true)
+ conn := s.super.Conn("z1111")
+ rootctx, _, _ := s.super.RootClients("z1111")
+ userctx, _, kc, _ := s.super.UserClients("z1111", rootctx, c, conn, s.oidcprovider.AuthEmail, true)
c.Assert(len(kc.DefaultStorageClasses) > 0, check.Equals, true)
coll, err := conn.CollectionCreate(userctx, arvados.CreateOptions{})
c.Assert(err, check.IsNil)
@@ -162,10 +153,10 @@ func (s *IntegrationSuite) TestDefaultStorageClassesOnCollections(c *check.C) {
}
func (s *IntegrationSuite) TestGetCollectionByPDH(c *check.C) {
- conn1 := s.testClusters["z1111"].Conn()
- rootctx1, _, _ := s.testClusters["z1111"].RootClients()
- conn3 := s.testClusters["z3333"].Conn()
- userctx1, ac1, kc1, _ := s.testClusters["z1111"].UserClients(rootctx1, c, conn1, s.oidcprovider.AuthEmail, true)
+ conn1 := s.super.Conn("z1111")
+ rootctx1, _, _ := s.super.RootClients("z1111")
+ conn3 := s.super.Conn("z3333")
+ userctx1, ac1, kc1, _ := s.super.UserClients("z1111", rootctx1, c, conn1, s.oidcprovider.AuthEmail, true)
// Create the collection to find its PDH (but don't save it
// anywhere yet)
@@ -201,11 +192,11 @@ func (s *IntegrationSuite) TestGetCollectionByPDH(c *check.C) {
// Tests bug #18004
func (s *IntegrationSuite) TestRemoteUserAndTokenCacheRace(c *check.C) {
- conn1 := s.testClusters["z1111"].Conn()
- rootctx1, _, _ := s.testClusters["z1111"].RootClients()
- rootctx2, _, _ := s.testClusters["z2222"].RootClients()
- conn2 := s.testClusters["z2222"].Conn()
- userctx1, _, _, _ := s.testClusters["z1111"].UserClients(rootctx1, c, conn1, "user2 at example.com", true)
+ conn1 := s.super.Conn("z1111")
+ rootctx1, _, _ := s.super.RootClients("z1111")
+ rootctx2, _, _ := s.super.RootClients("z2222")
+ conn2 := s.super.Conn("z2222")
+ userctx1, _, _, _ := s.super.UserClients("z1111", rootctx1, c, conn1, "user2 at example.com", true)
var wg1, wg2 sync.WaitGroup
creqs := 100
@@ -250,13 +241,13 @@ func (s *IntegrationSuite) TestS3WithFederatedToken(c *check.C) {
testText := "IntegrationSuite.TestS3WithFederatedToken"
- conn1 := s.testClusters["z1111"].Conn()
- rootctx1, _, _ := s.testClusters["z1111"].RootClients()
- userctx1, ac1, _, _ := s.testClusters["z1111"].UserClients(rootctx1, c, conn1, s.oidcprovider.AuthEmail, true)
- conn3 := s.testClusters["z3333"].Conn()
+ conn1 := s.super.Conn("z1111")
+ rootctx1, _, _ := s.super.RootClients("z1111")
+ userctx1, ac1, _, _ := s.super.UserClients("z1111", rootctx1, c, conn1, s.oidcprovider.AuthEmail, true)
+ conn3 := s.super.Conn("z3333")
createColl := func(clusterID string) arvados.Collection {
- _, ac, kc := s.testClusters[clusterID].ClientsWithToken(ac1.AuthToken)
+ _, ac, kc := s.super.ClientsWithToken(clusterID, ac1.AuthToken)
var coll arvados.Collection
fs, err := coll.FileSystem(ac, kc)
c.Assert(err, check.IsNil)
@@ -268,7 +259,7 @@ func (s *IntegrationSuite) TestS3WithFederatedToken(c *check.C) {
c.Assert(err, check.IsNil)
mtxt, err := fs.MarshalManifest(".")
c.Assert(err, check.IsNil)
- coll, err = s.testClusters[clusterID].Conn().CollectionCreate(userctx1, arvados.CreateOptions{Attrs: map[string]interface{}{
+ coll, err = s.super.Conn(clusterID).CollectionCreate(userctx1, arvados.CreateOptions{Attrs: map[string]interface{}{
"manifest_text": mtxt,
}})
c.Assert(err, check.IsNil)
@@ -316,10 +307,10 @@ func (s *IntegrationSuite) TestS3WithFederatedToken(c *check.C) {
}
func (s *IntegrationSuite) TestGetCollectionAsAnonymous(c *check.C) {
- conn1 := s.testClusters["z1111"].Conn()
- conn3 := s.testClusters["z3333"].Conn()
- rootctx1, rootac1, rootkc1 := s.testClusters["z1111"].RootClients()
- anonctx3, anonac3, _ := s.testClusters["z3333"].AnonymousClients()
+ conn1 := s.super.Conn("z1111")
+ conn3 := s.super.Conn("z3333")
+ rootctx1, rootac1, rootkc1 := s.super.RootClients("z1111")
+ anonctx3, anonac3, _ := s.super.AnonymousClients("z3333")
// Make sure anonymous token was set
c.Assert(anonac3.AuthToken, check.Not(check.Equals), "")
@@ -368,7 +359,7 @@ func (s *IntegrationSuite) TestGetCollectionAsAnonymous(c *check.C) {
c.Check(err, check.IsNil)
// Make a v2 token of the z3 anonymous user, and use it on z1
- _, anonac1, _ := s.testClusters["z1111"].ClientsWithToken(outAuth.TokenV2())
+ _, anonac1, _ := s.super.ClientsWithToken("z1111", outAuth.TokenV2())
outUser2, err := anonac1.CurrentUser()
c.Check(err, check.IsNil)
// z3 anonymous user will be mapped to the z1 anonymous user
@@ -383,14 +374,14 @@ func (s *IntegrationSuite) TestGetCollectionAsAnonymous(c *check.C) {
// Get a token from the login cluster (z1111), use it to submit a
// container request on z2222.
func (s *IntegrationSuite) TestCreateContainerRequestWithFedToken(c *check.C) {
- conn1 := s.testClusters["z1111"].Conn()
- rootctx1, _, _ := s.testClusters["z1111"].RootClients()
- _, ac1, _, _ := s.testClusters["z1111"].UserClients(rootctx1, c, conn1, s.oidcprovider.AuthEmail, true)
+ conn1 := s.super.Conn("z1111")
+ rootctx1, _, _ := s.super.RootClients("z1111")
+ _, ac1, _, _ := s.super.UserClients("z1111", rootctx1, c, conn1, s.oidcprovider.AuthEmail, true)
// Use ac2 to get the discovery doc with a blank token, so the
// SDK doesn't magically pass the z1111 token to z2222 before
// we're ready to start our test.
- _, ac2, _ := s.testClusters["z2222"].ClientsWithToken("")
+ _, ac2, _ := s.super.ClientsWithToken("z2222", "")
var dd map[string]interface{}
err := ac2.RequestAndDecode(&dd, "GET", "discovery/v1/apis/arvados/v1/rest", nil, nil)
c.Assert(err, check.IsNil)
@@ -452,9 +443,9 @@ func (s *IntegrationSuite) TestCreateContainerRequestWithFedToken(c *check.C) {
}
func (s *IntegrationSuite) TestCreateContainerRequestWithBadToken(c *check.C) {
- conn1 := s.testClusters["z1111"].Conn()
- rootctx1, _, _ := s.testClusters["z1111"].RootClients()
- _, ac1, _, au := s.testClusters["z1111"].UserClients(rootctx1, c, conn1, "user at example.com", true)
+ conn1 := s.super.Conn("z1111")
+ rootctx1, _, _ := s.super.RootClients("z1111")
+ _, ac1, _, au := s.super.UserClients("z1111", rootctx1, c, conn1, "user at example.com", true)
tests := []struct {
name string
@@ -489,9 +480,9 @@ func (s *IntegrationSuite) TestCreateContainerRequestWithBadToken(c *check.C) {
}
func (s *IntegrationSuite) TestRequestIDHeader(c *check.C) {
- conn1 := s.testClusters["z1111"].Conn()
- rootctx1, _, _ := s.testClusters["z1111"].RootClients()
- userctx1, ac1, _, _ := s.testClusters["z1111"].UserClients(rootctx1, c, conn1, "user at example.com", true)
+ conn1 := s.super.Conn("z1111")
+ rootctx1, _, _ := s.super.RootClients("z1111")
+ userctx1, ac1, _, _ := s.super.UserClients("z1111", rootctx1, c, conn1, "user at example.com", true)
coll, err := conn1.CollectionCreate(userctx1, arvados.CreateOptions{})
c.Check(err, check.IsNil)
@@ -563,7 +554,7 @@ func (s *IntegrationSuite) TestRequestIDHeader(c *check.C) {
// to test tokens that are secret, so there is no API response that will give them back
func (s *IntegrationSuite) dbConn(c *check.C, clusterID string) (*sql.DB, *sql.Conn) {
ctx := context.Background()
- db, err := sql.Open("postgres", s.testClusters[clusterID].Super.Cluster().PostgreSQL.Connection.String())
+ db, err := sql.Open("postgres", s.super.Cluster(clusterID).PostgreSQL.Connection.String())
c.Assert(err, check.IsNil)
conn, err := db.Conn(ctx)
@@ -583,9 +574,9 @@ func (s *IntegrationSuite) TestRuntimeTokenInCR(c *check.C) {
db, dbconn := s.dbConn(c, "z1111")
defer db.Close()
defer dbconn.Close()
- conn1 := s.testClusters["z1111"].Conn()
- rootctx1, _, _ := s.testClusters["z1111"].RootClients()
- userctx1, ac1, _, au := s.testClusters["z1111"].UserClients(rootctx1, c, conn1, "user at example.com", true)
+ conn1 := s.super.Conn("z1111")
+ rootctx1, _, _ := s.super.RootClients("z1111")
+ userctx1, ac1, _, au := s.super.UserClients("z1111", rootctx1, c, conn1, "user at example.com", true)
tests := []struct {
name string
@@ -635,9 +626,9 @@ func (s *IntegrationSuite) TestRuntimeTokenInCR(c *check.C) {
// one cluster with another cluster as the destination
// and check the tokens are being handled properly
func (s *IntegrationSuite) TestIntermediateCluster(c *check.C) {
- conn1 := s.testClusters["z1111"].Conn()
- rootctx1, _, _ := s.testClusters["z1111"].RootClients()
- uctx1, ac1, _, _ := s.testClusters["z1111"].UserClients(rootctx1, c, conn1, "user at example.com", true)
+ conn1 := s.super.Conn("z1111")
+ rootctx1, _, _ := s.super.RootClients("z1111")
+ uctx1, ac1, _, _ := s.super.UserClients("z1111", rootctx1, c, conn1, "user at example.com", true)
tests := []struct {
name string
@@ -667,20 +658,20 @@ func (s *IntegrationSuite) TestIntermediateCluster(c *check.C) {
// Test for #17785
func (s *IntegrationSuite) TestFederatedApiClientAuthHandling(c *check.C) {
- rootctx1, rootclnt1, _ := s.testClusters["z1111"].RootClients()
- conn1 := s.testClusters["z1111"].Conn()
+ rootctx1, rootclnt1, _ := s.super.RootClients("z1111")
+ conn1 := s.super.Conn("z1111")
// Make sure LoginCluster is properly configured
for _, cls := range []string{"z1111", "z3333"} {
c.Check(
- s.testClusters[cls].Config.Clusters[cls].Login.LoginCluster,
+ s.super.Cluster(cls).Login.LoginCluster,
check.Equals, "z1111",
check.Commentf("incorrect LoginCluster config on cluster %q", cls))
}
// Get user's UUID & attempt to create a token for it on the remote cluster
- _, _, _, user := s.testClusters["z1111"].UserClients(rootctx1, c, conn1,
+ _, _, _, user := s.super.UserClients("z1111", rootctx1, c, conn1,
"user at example.com", true)
- _, rootclnt3, _ := s.testClusters["z3333"].ClientsWithToken(rootclnt1.AuthToken)
+ _, rootclnt3, _ := s.super.ClientsWithToken("z3333", rootclnt1.AuthToken)
var resp arvados.APIClientAuthorization
err := rootclnt3.RequestAndDecode(
&resp, "POST", "arvados/v1/api_client_authorizations", nil,
@@ -699,7 +690,7 @@ func (s *IntegrationSuite) TestFederatedApiClientAuthHandling(c *check.C) {
c.Assert(strings.HasPrefix(newTok, "v2/z1111-gj3su-"), check.Equals, true)
// Confirm the token works and is from the correct user
- _, rootclnt3bis, _ := s.testClusters["z3333"].ClientsWithToken(newTok)
+ _, rootclnt3bis, _ := s.super.ClientsWithToken("z3333", newTok)
var curUser arvados.User
err = rootclnt3bis.RequestAndDecode(
&curUser, "GET", "arvados/v1/users/current", nil, nil,
@@ -708,7 +699,7 @@ func (s *IntegrationSuite) TestFederatedApiClientAuthHandling(c *check.C) {
c.Assert(curUser.UUID, check.Equals, user.UUID)
// Request the ApiClientAuthorization list using the new token
- _, userClient, _ := s.testClusters["z3333"].ClientsWithToken(newTok)
+ _, userClient, _ := s.super.ClientsWithToken("z3333", newTok)
var acaLst arvados.APIClientAuthorizationList
err = userClient.RequestAndDecode(
&acaLst, "GET", "arvados/v1/api_client_authorizations", nil, nil,
@@ -718,15 +709,15 @@ func (s *IntegrationSuite) TestFederatedApiClientAuthHandling(c *check.C) {
// Test for bug #18076
func (s *IntegrationSuite) TestStaleCachedUserRecord(c *check.C) {
- rootctx1, _, _ := s.testClusters["z1111"].RootClients()
- _, rootclnt3, _ := s.testClusters["z3333"].RootClients()
- conn1 := s.testClusters["z1111"].Conn()
- conn3 := s.testClusters["z3333"].Conn()
+ rootctx1, _, _ := s.super.RootClients("z1111")
+ _, rootclnt3, _ := s.super.RootClients("z3333")
+ conn1 := s.super.Conn("z1111")
+ conn3 := s.super.Conn("z3333")
// Make sure LoginCluster is properly configured
for _, cls := range []string{"z1111", "z3333"} {
c.Check(
- s.testClusters[cls].Config.Clusters[cls].Login.LoginCluster,
+ s.super.Cluster(cls).Login.LoginCluster,
check.Equals, "z1111",
check.Commentf("incorrect LoginCluster config on cluster %q", cls))
}
@@ -742,7 +733,7 @@ func (s *IntegrationSuite) TestStaleCachedUserRecord(c *check.C) {
// Create some users, request them on the federated cluster so they're cached.
var users []arvados.User
for userNr := 0; userNr < 2; userNr++ {
- _, _, _, user := s.testClusters["z1111"].UserClients(
+ _, _, _, user := s.super.UserClients("z1111",
rootctx1,
c,
conn1,
@@ -821,15 +812,15 @@ func (s *IntegrationSuite) TestStaleCachedUserRecord(c *check.C) {
// Test for bug #16263
func (s *IntegrationSuite) TestListUsers(c *check.C) {
- rootctx1, _, _ := s.testClusters["z1111"].RootClients()
- conn1 := s.testClusters["z1111"].Conn()
- conn3 := s.testClusters["z3333"].Conn()
- userctx1, _, _, _ := s.testClusters["z1111"].UserClients(rootctx1, c, conn1, s.oidcprovider.AuthEmail, true)
+ rootctx1, _, _ := s.super.RootClients("z1111")
+ conn1 := s.super.Conn("z1111")
+ conn3 := s.super.Conn("z3333")
+ userctx1, _, _, _ := s.super.UserClients("z1111", rootctx1, c, conn1, s.oidcprovider.AuthEmail, true)
// Make sure LoginCluster is properly configured
- for cls := range s.testClusters {
+ for _, cls := range []string{"z1111", "z2222", "z3333"} {
c.Check(
- s.testClusters[cls].Config.Clusters[cls].Login.LoginCluster,
+ s.super.Cluster(cls).Login.LoginCluster,
check.Equals, "z1111",
check.Commentf("incorrect LoginCluster config on cluster %q", cls))
}
@@ -889,12 +880,12 @@ func (s *IntegrationSuite) TestListUsers(c *check.C) {
}
func (s *IntegrationSuite) TestSetupUserWithVM(c *check.C) {
- conn1 := s.testClusters["z1111"].Conn()
- conn3 := s.testClusters["z3333"].Conn()
- rootctx1, rootac1, _ := s.testClusters["z1111"].RootClients()
+ conn1 := s.super.Conn("z1111")
+ conn3 := s.super.Conn("z3333")
+ rootctx1, rootac1, _ := s.super.RootClients("z1111")
// Create user on LoginCluster z1111
- _, _, _, user := s.testClusters["z1111"].UserClients(rootctx1, c, conn1, s.oidcprovider.AuthEmail, true)
+ _, _, _, user := s.super.UserClients("z1111", rootctx1, c, conn1, s.oidcprovider.AuthEmail, true)
// Make a new root token (because rootClients() uses SystemRootToken)
var outAuth arvados.APIClientAuthorization
@@ -902,7 +893,7 @@ func (s *IntegrationSuite) TestSetupUserWithVM(c *check.C) {
c.Check(err, check.IsNil)
// Make a v2 root token to communicate with z3333
- rootctx3, rootac3, _ := s.testClusters["z3333"].ClientsWithToken(outAuth.TokenV2())
+ rootctx3, rootac3, _ := s.super.ClientsWithToken("z3333", outAuth.TokenV2())
// Create VM on z3333
var outVM arvados.VirtualMachine
@@ -953,9 +944,9 @@ func (s *IntegrationSuite) TestSetupUserWithVM(c *check.C) {
}
func (s *IntegrationSuite) TestOIDCAccessTokenAuth(c *check.C) {
- conn1 := s.testClusters["z1111"].Conn()
- rootctx1, _, _ := s.testClusters["z1111"].RootClients()
- s.testClusters["z1111"].UserClients(rootctx1, c, conn1, s.oidcprovider.AuthEmail, true)
+ conn1 := s.super.Conn("z1111")
+ rootctx1, _, _ := s.super.RootClients("z1111")
+ s.super.UserClients("z1111", rootctx1, c, conn1, s.oidcprovider.AuthEmail, true)
accesstoken := s.oidcprovider.ValidAccessToken()
@@ -967,8 +958,8 @@ func (s *IntegrationSuite) TestOIDCAccessTokenAuth(c *check.C) {
{
c.Logf("save collection to %s", clusterID)
- conn := s.testClusters[clusterID].Conn()
- ctx, ac, kc := s.testClusters[clusterID].ClientsWithToken(accesstoken)
+ conn := s.super.Conn(clusterID)
+ ctx, ac, kc := s.super.ClientsWithToken(clusterID, accesstoken)
fs, err := coll.FileSystem(ac, kc)
c.Assert(err, check.IsNil)
@@ -992,8 +983,8 @@ func (s *IntegrationSuite) TestOIDCAccessTokenAuth(c *check.C) {
for _, readClusterID := range []string{"z1111", "z2222", "z3333"} {
c.Logf("retrieve %s from %s", coll.UUID, readClusterID)
- conn := s.testClusters[readClusterID].Conn()
- ctx, ac, kc := s.testClusters[readClusterID].ClientsWithToken(accesstoken)
+ conn := s.super.Conn(readClusterID)
+ ctx, ac, kc := s.super.ClientsWithToken(readClusterID, accesstoken)
user, err := conn.UserGetCurrent(ctx, arvados.GetOptions{})
c.Assert(err, check.IsNil)
@@ -1021,11 +1012,11 @@ func (s *IntegrationSuite) TestForwardRuntimeTokenToLoginCluster(c *check.C) {
db3, db3conn := s.dbConn(c, "z3333")
defer db3.Close()
defer db3conn.Close()
- rootctx1, _, _ := s.testClusters["z1111"].RootClients()
- rootctx3, _, _ := s.testClusters["z3333"].RootClients()
- conn1 := s.testClusters["z1111"].Conn()
- conn3 := s.testClusters["z3333"].Conn()
- userctx1, _, _, _ := s.testClusters["z1111"].UserClients(rootctx1, c, conn1, "user at example.com", true)
+ rootctx1, _, _ := s.super.RootClients("z1111")
+ rootctx3, _, _ := s.super.RootClients("z3333")
+ conn1 := s.super.Conn("z1111")
+ conn3 := s.super.Conn("z3333")
+ userctx1, _, _, _ := s.super.UserClients("z1111", rootctx1, c, conn1, "user at example.com", true)
user1, err := conn1.UserGetCurrent(userctx1, arvados.GetOptions{})
c.Assert(err, check.IsNil)
@@ -1063,7 +1054,7 @@ func (s *IntegrationSuite) TestForwardRuntimeTokenToLoginCluster(c *check.C) {
row.Scan(&val)
c.Assert(val.Valid, check.Equals, true)
runtimeToken := "v2/" + ctr.AuthUUID + "/" + val.String
- ctrctx, _, _ := s.testClusters["z3333"].ClientsWithToken(runtimeToken)
+ ctrctx, _, _ := s.super.ClientsWithToken("z3333", runtimeToken)
c.Logf("container runtime token %+v", runtimeToken)
_, err = conn3.UserGet(ctrctx, arvados.GetOptions{UUID: user1.UUID})
diff --git a/tools/sync-groups/federation_test.go b/tools/sync-groups/federation_test.go
index d5fed3e29..71c119156 100644
--- a/tools/sync-groups/federation_test.go
+++ b/tools/sync-groups/federation_test.go
@@ -5,13 +5,12 @@
package main
import (
- "bytes"
+ "context"
"net"
"os"
- "path/filepath"
+ "time"
"git.arvados.org/arvados.git/lib/boot"
- "git.arvados.org/arvados.git/lib/config"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadostest"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
@@ -23,7 +22,7 @@ var _ = check.Suite(&FederationSuite{})
var origAPIHost, origAPIToken string
type FederationSuite struct {
- testClusters map[string]*boot.TestCluster
+ super *boot.Supervisor
oidcprovider *arvadostest.OIDCProvider
}
@@ -31,8 +30,6 @@ func (s *FederationSuite) SetUpSuite(c *check.C) {
origAPIHost = os.Getenv("ARVADOS_API_HOST")
origAPIToken = os.Getenv("ARVADOS_API_TOKEN")
- cwd, _ := os.Getwd()
-
s.oidcprovider = arvadostest.NewOIDCProvider(c)
s.oidcprovider.AuthEmail = "user at example.com"
s.oidcprovider.AuthEmailVerified = true
@@ -40,12 +37,8 @@ func (s *FederationSuite) SetUpSuite(c *check.C) {
s.oidcprovider.ValidClientID = "clientid"
s.oidcprovider.ValidClientSecret = "clientsecret"
- s.testClusters = map[string]*boot.TestCluster{
- "z1111": nil,
- "z2222": nil,
- }
hostport := map[string]string{}
- for id := range s.testClusters {
+ for _, id := range []string{"z1111", "z2222"} {
hostport[id] = func() string {
// TODO: Instead of expecting random ports on
// 127.0.0.11, 22 to be race-safe, try
@@ -59,8 +52,9 @@ func (s *FederationSuite) SetUpSuite(c *check.C) {
return "127.0.0." + id[3:] + ":" + port
}()
}
- for id := range s.testClusters {
- yaml := `Clusters:
+ yaml := "Clusters:\n"
+ for id := range hostport {
+ yaml += `
` + id + `:
Services:
Controller:
@@ -104,30 +98,27 @@ func (s *FederationSuite) SetUpSuite(c *check.C) {
LoginCluster: z1111
`
}
-
- loader := config.NewLoader(bytes.NewBufferString(yaml), ctxlog.TestLogger(c))
- loader.Path = "-"
- loader.SkipLegacy = true
- loader.SkipAPICalls = true
- cfg, err := loader.Load()
- c.Assert(err, check.IsNil)
- tc := boot.NewTestCluster(
- filepath.Join(cwd, "..", ".."),
- id, cfg, "127.0.0."+id[3:], c.Log)
- tc.Super.NoWorkbench1 = true
- tc.Super.NoWorkbench2 = true
- tc.Start()
- s.testClusters[id] = tc
}
- for _, tc := range s.testClusters {
- ok := tc.WaitReady()
- c.Assert(ok, check.Equals, true)
+ s.super = &boot.Supervisor{
+ ClusterType: "test",
+ ConfigYAML: yaml,
+ Stderr: ctxlog.LogWriter(c.Log),
+ NoWorkbench1: true,
+ NoWorkbench2: true,
+ OwnTemporaryDatabase: true,
}
+ // Give up if startup takes longer than 3m
+ timeout := time.AfterFunc(3*time.Minute, s.super.Stop)
+ defer timeout.Stop()
+ s.super.Start(context.Background())
+ ok := s.super.WaitReady()
+ c.Assert(ok, check.Equals, true)
+
// Activate user, make it admin.
- conn1 := s.testClusters["z1111"].Conn()
- rootctx1, _, _ := s.testClusters["z1111"].RootClients()
- userctx1, _, _, _ := s.testClusters["z1111"].UserClients(rootctx1, c, conn1, s.oidcprovider.AuthEmail, true)
+ conn1 := s.super.Conn("z1111")
+ rootctx1, _, _ := s.super.RootClients("z1111")
+ userctx1, _, _, _ := s.super.UserClients("z1111", rootctx1, c, conn1, s.oidcprovider.AuthEmail, true)
user1, err := conn1.UserGetCurrent(userctx1, arvados.GetOptions{})
c.Assert(err, check.IsNil)
c.Assert(user1.IsAdmin, check.Equals, false)
@@ -142,25 +133,23 @@ func (s *FederationSuite) SetUpSuite(c *check.C) {
}
func (s *FederationSuite) TearDownSuite(c *check.C) {
- for _, c := range s.testClusters {
- c.Super.Stop()
- }
+ s.super.Stop()
_ = os.Setenv("ARVADOS_API_HOST", origAPIHost)
_ = os.Setenv("ARVADOS_API_TOKEN", origAPIToken)
}
func (s *FederationSuite) TestGroupSyncingOnFederatedCluster(c *check.C) {
// Get admin user's V2 token
- conn1 := s.testClusters["z1111"].Conn()
- rootctx1, _, _ := s.testClusters["z1111"].RootClients()
- userctx1, _, _, _ := s.testClusters["z1111"].UserClients(rootctx1, c, conn1, s.oidcprovider.AuthEmail, true)
+ conn1 := s.super.Conn("z1111")
+ rootctx1, _, _ := s.super.RootClients("z1111")
+ userctx1, _, _, _ := s.super.UserClients("z1111", rootctx1, c, conn1, s.oidcprovider.AuthEmail, true)
user1Auth, err := conn1.APIClientAuthorizationCurrent(userctx1, arvados.GetOptions{})
c.Check(err, check.IsNil)
userV2Token := user1Auth.TokenV2()
// Get federated admin clients on z2222 to set up environment
- conn2 := s.testClusters["z2222"].Conn()
- userctx2, userac2, _ := s.testClusters["z2222"].ClientsWithToken(userV2Token)
+ conn2 := s.super.Conn("z2222")
+ userctx2, userac2, _ := s.super.ClientsWithToken("z2222", userV2Token)
user2, err := conn2.UserGetCurrent(userctx2, arvados.GetOptions{})
c.Check(err, check.IsNil)
c.Check(user2.IsAdmin, check.Equals, true)
@@ -177,7 +166,7 @@ func (s *FederationSuite) TestGroupSyncingOnFederatedCluster(c *check.C) {
Filters: []arvados.Filter{{
Attr: "owner_uuid",
Operator: "=",
- Operand: s.testClusters["z2222"].ClusterID + "-tpzed-000000000000000",
+ Operand: s.super.Cluster("z2222").ClusterID + "-tpzed-000000000000000",
}, {
Attr: "name",
Operator: "=",
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list