[arvados] created: 2.5.0-1-g194bd77e0
git repository hosting
git at public.arvados.org
Thu Dec 22 21:42:04 UTC 2022
at 194bd77e028a9082e6f252974fcee2dfbaed305f (commit)
commit 194bd77e028a9082e6f252974fcee2dfbaed305f
Author: Tom Clegg <tom at curii.com>
Date: Thu Dec 22 16:39:03 2022 -0500
19564: Add AutoReloadConfig support for individual services.
Previously, AutoReloadConfig only applied to services running under
"arvados-server boot".
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/boot/supervisor.go b/lib/boot/supervisor.go
index 0f0600f18..3b4602114 100644
--- a/lib/boot/supervisor.go
+++ b/lib/boot/supervisor.go
@@ -20,7 +20,6 @@ import (
"os/signal"
"os/user"
"path/filepath"
- "reflect"
"strconv"
"strings"
"sync"
@@ -32,7 +31,6 @@ import (
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
"git.arvados.org/arvados.git/sdk/go/health"
- "github.com/fsnotify/fsnotify"
"github.com/sirupsen/logrus"
)
@@ -153,13 +151,17 @@ func (super *Supervisor) Start(ctx context.Context) {
return
}
- if super.ConfigPath != "" && super.ConfigPath != "-" && cfg.AutoReloadConfig {
- go watchConfig(super.ctx, super.logger, super.ConfigPath, copyConfig(cfg), func() {
- if super.err == nil {
- super.err = errNeedConfigReload
- }
- super.cancel()
- })
+ err = loader.WatchConfig(super.ctx, func() {
+ if super.err == nil {
+ super.err = errNeedConfigReload
+ }
+ super.cancel()
+ })
+ if err != nil {
+ super.err = err
+ close(super.done)
+ super.cancel()
+ return
}
if len(cfg.Clusters) > 1 {
@@ -288,6 +290,7 @@ func (super *Supervisor) runCluster() error {
}
defer conffile.Close()
err = json.NewEncoder(conffile).Encode(arvados.Config{
+ AutoReloadConfig: false,
Clusters: map[string]arvados.Cluster{
super.cluster.ClusterID: *super.cluster}})
if err != nil {
@@ -1031,67 +1034,3 @@ func waitForConnect(ctx context.Context, addr string) error {
}
return ctx.Err()
}
-
-func copyConfig(cfg *arvados.Config) *arvados.Config {
- pr, pw := io.Pipe()
- go func() {
- err := json.NewEncoder(pw).Encode(cfg)
- if err != nil {
- panic(err)
- }
- pw.Close()
- }()
- cfg2 := new(arvados.Config)
- err := json.NewDecoder(pr).Decode(cfg2)
- if err != nil {
- panic(err)
- }
- return cfg2
-}
-
-func watchConfig(ctx context.Context, logger logrus.FieldLogger, cfgPath string, prevcfg *arvados.Config, fn func()) {
- watcher, err := fsnotify.NewWatcher()
- if err != nil {
- logger.WithError(err).Error("fsnotify setup failed")
- return
- }
- defer watcher.Close()
-
- err = watcher.Add(cfgPath)
- if err != nil {
- logger.WithError(err).Error("fsnotify watcher failed")
- return
- }
-
- for {
- select {
- case <-ctx.Done():
- return
- case err, ok := <-watcher.Errors:
- if !ok {
- return
- }
- logger.WithError(err).Warn("fsnotify watcher reported error")
- case _, ok := <-watcher.Events:
- if !ok {
- return
- }
- for len(watcher.Events) > 0 {
- <-watcher.Events
- }
- loader := config.NewLoader(&bytes.Buffer{}, &logrus.Logger{Out: ioutil.Discard})
- loader.Path = cfgPath
- loader.SkipAPICalls = true
- cfg, err := loader.Load()
- if err != nil {
- logger.WithError(err).Warn("error reloading config file after change detected; ignoring new config for now")
- } else if reflect.DeepEqual(cfg, prevcfg) {
- logger.Debug("config file changed but is still DeepEqual to the existing config")
- } else {
- logger.Debug("config changed, notifying supervisor")
- fn()
- prevcfg = cfg
- }
- }
- }
-}
diff --git a/lib/config/load.go b/lib/config/load.go
index 9269ddf27..4ded8aa1a 100644
--- a/lib/config/load.go
+++ b/lib/config/load.go
@@ -198,6 +198,10 @@ func (ldr *Loader) loadBytes(path string) (buf []byte, sourceTime, loadTime time
return
}
+func (ldr *Loader) Unload() {
+ ldr.configdata = nil
+}
+
func (ldr *Loader) Load() (*arvados.Config, error) {
if ldr.configdata == nil {
buf, sourceTime, loadTime, err := ldr.loadBytes(ldr.Path)
diff --git a/lib/config/watch.go b/lib/config/watch.go
new file mode 100644
index 000000000..3d6c6d7fc
--- /dev/null
+++ b/lib/config/watch.go
@@ -0,0 +1,142 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package config
+
+import (
+ "bytes"
+ "context"
+ "encoding/json"
+ "fmt"
+ "io"
+ "io/ioutil"
+ _ "net/http/pprof"
+ "reflect"
+ "time"
+
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "github.com/fsnotify/fsnotify"
+ "github.com/sirupsen/logrus"
+)
+
+// WatchConfig starts a goroutine that calls fn when the config file
+// changes in such a way that the new content is loadable and
+// semantically different from the previous config.
+//
+// After fn is called, the next call to Load() will return the updated
+// config.
+//
+// If cfgPath is empty or "-", or the config has AutoReloadConfig
+// turned off, then WatchConfig does nothing.
+func (ldr *Loader) WatchConfig(ctx context.Context, fn func()) error {
+ if ldr.Path == "" || ldr.Path == "-" {
+ ldr.Logger.Debugf("AutoReloadConfig is disabled because config %q is not a regular file", ldr.Path)
+ return nil
+ }
+ cfg, err := ldr.Load()
+ if err != nil {
+ return err
+ }
+ if !cfg.AutoReloadConfig {
+ ldr.Logger.Debug("AutoReloadConfig is disabled, not watching config")
+ return nil
+ }
+ var copyerr error
+ pr, pw := io.Pipe()
+ go func() {
+ err := json.NewEncoder(pw).Encode(cfg)
+ if err != nil {
+ copyerr = err
+ }
+ err = pw.Close()
+ if copyerr == nil {
+ copyerr = err
+ }
+ }()
+ cfg2 := new(arvados.Config)
+ err = json.NewDecoder(pr).Decode(cfg2)
+ if err != nil {
+ return fmt.Errorf("error copying config: %w", err)
+ }
+ err = pr.Close()
+ if err == nil {
+ err = copyerr
+ }
+ if err != nil {
+ return fmt.Errorf("error copying config: %w", err)
+ }
+ watcher, err := fsnotify.NewWatcher()
+ if err != nil {
+ return fmt.Errorf("fsnotify setup failed: %w", err)
+ }
+ go watchConfig(ctx, ldr.Logger, ldr.Path, cfg2, watcher, func(configdata []byte) {
+ ldr.configdata = configdata
+ fn()
+ })
+ return nil
+}
+
+func watchConfig(ctx context.Context, logger logrus.FieldLogger, cfgPath string, prevcfg *arvados.Config, watcher *fsnotify.Watcher, fn func([]byte)) {
+ defer watcher.Close()
+ rewatch := func() {
+ watcher.Remove(cfgPath)
+ for delay := time.Second / 10; ; {
+ err := watcher.Add(cfgPath)
+ if err != nil {
+ logger.WithError(err).WithField("file", cfgPath).Warn("fsnotify watch failed")
+ time.Sleep(delay)
+ if delay < time.Minute {
+ delay = delay * 2
+ }
+ continue
+ }
+ break
+ }
+ }
+ rewatch()
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case err, ok := <-watcher.Errors:
+ if !ok {
+ return
+ }
+ logger.WithError(err).Warn("fsnotify watcher reported error")
+ case _, ok := <-watcher.Events:
+ if !ok {
+ return
+ }
+ for len(watcher.Events) > 0 {
+ <-watcher.Events
+ }
+
+ // We remove and re-add our watcher here so
+ // that, if someone renames a new config file
+ // into place (as they should), we receive
+ // events about the *new* file.
+ //
+ // Setting up the watcher here (before reading
+ // the new file) ensures we will get notified
+ // in the next loop iteration if the new file
+ // is changed or replaced before we even
+ // finish reading it.
+ rewatch()
+
+ loader := NewLoader(&bytes.Buffer{}, &logrus.Logger{Out: ioutil.Discard})
+ loader.Path = cfgPath
+ loader.SkipAPICalls = true
+ cfg, err := loader.Load()
+ if err != nil {
+ logger.WithError(err).Warn("error reloading config file after change detected; ignoring new config for now")
+ } else if reflect.DeepEqual(cfg, prevcfg) {
+ logger.Debug("config file changed but is still DeepEqual to the existing config")
+ } else {
+ logger.Debug("config changed, notifying")
+ fn(loader.configdata)
+ prevcfg = cfg
+ }
+ }
+ }
+}
diff --git a/lib/service/cmd.go b/lib/service/cmd.go
index 20441c2a6..e950f05c8 100644
--- a/lib/service/cmd.go
+++ b/lib/service/cmd.go
@@ -103,33 +103,63 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
loader.SkipAPICalls = true
}
- cfg, err := loader.Load()
- if err != nil {
- return 1
- }
- cluster, err := cfg.GetCluster("")
+ wantReload := make(chan struct{}, 1)
+ err = loader.WatchConfig(c.ctx, func() {
+ wantReload <- struct{}{}
+ })
if err != nil {
+ log.WithError(err).Error("exiting")
return 1
}
- // Now that we've read the config, replace the bootstrap
- // logger with a new one according to the logging config.
- log = ctxlog.New(stderr, cluster.SystemLogs.Format, cluster.SystemLogs.LogLevel)
- logger := log.WithFields(logrus.Fields{
- "PID": os.Getpid(),
- "ClusterID": cluster.ClusterID,
- })
- ctx := ctxlog.Context(c.ctx, logger)
+ for {
+ cfg, err := loader.Load()
+ if err != nil {
+ log.WithError(err).Error("exiting")
+ return 1
+ }
+ cluster, err := cfg.GetCluster("")
+ if err != nil {
+ log.WithError(err).Error("exiting")
+ return 1
+ }
+
+ // Now that we've read the config, replace the bootstrap
+ // logger with a new one according to the logging config.
+ baseLogger := ctxlog.New(stderr, cluster.SystemLogs.Format, cluster.SystemLogs.LogLevel)
+ logger := baseLogger.WithFields(logrus.Fields{
+ "PID": os.Getpid(),
+ "ClusterID": cluster.ClusterID,
+ })
+ ctx := ctxlog.Context(c.ctx, logger)
+ loader.Logger = logger
+
+ reg := prometheus.NewRegistry()
+ loader.RegisterMetrics(reg)
- listenURL, internalURL, err := getListenAddr(cluster.Services, c.svcName, log)
+ ctx, cancel := context.WithCancel(ctx)
+ defer cancel()
+ go func() {
+ <-wantReload
+ cancel()
+ }()
+ err = c.run(ctx, cluster, baseLogger, logger, reg)
+ if err != nil && ctx.Err() == nil {
+ // run returned an error, and we aren't trying
+ // to cancel/reload -- something went wrong
+ log.WithError(err).Error("exiting")
+ return 1
+ }
+ }
+}
+
+func (c *command) run(ctx context.Context, cluster *arvados.Cluster, baseLogger *logrus.Logger, logger logrus.FieldLogger, reg *prometheus.Registry) error {
+ listenURL, internalURL, err := getListenAddr(cluster.Services, c.svcName, logger)
if err != nil {
- return 1
+ return err
}
ctx = context.WithValue(ctx, contextKeyURL{}, internalURL)
- reg := prometheus.NewRegistry()
- loader.RegisterMetrics(reg)
-
// arvados_version_running{version="1.2.3~4"} 1.0
mVersion := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "arvados",
@@ -140,11 +170,11 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
reg.MustRegister(mVersion)
handler := c.newHandler(ctx, cluster, cluster.SystemRootToken, reg)
- if err = handler.CheckHealth(); err != nil {
- return 1
+ if err := handler.CheckHealth(); err != nil {
+ return err
}
- instrumented := httpserver.Instrument(reg, log,
+ instrumented := httpserver.Instrument(reg, baseLogger,
httpserver.HandlerWithDeadline(cluster.API.RequestTimeout.Duration(),
httpserver.AddRequestIDs(
httpserver.Inspect(reg, cluster.ManagementToken,
@@ -162,13 +192,13 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
tlsconfig, err := makeTLSConfig(cluster, logger)
if err != nil {
logger.WithError(err).Errorf("cannot start %s service on %s", c.svcName, listenURL.String())
- return 1
+ return err
}
srv.TLSConfig = tlsconfig
}
err = srv.Start()
if err != nil {
- return 1
+ return err
}
logger.WithFields(logrus.Fields{
"URL": listenURL,
@@ -191,9 +221,9 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
}()
err = srv.Wait()
if err != nil {
- return 1
+ return err
}
- return 0
+ return nil
}
// If an incoming request's target vhost has an embedded collection
diff --git a/lib/service/cmd_test.go b/lib/service/cmd_test.go
index 7db910927..4e145f8d5 100644
--- a/lib/service/cmd_test.go
+++ b/lib/service/cmd_test.go
@@ -10,11 +10,13 @@ import (
"context"
"crypto/tls"
"fmt"
+ "io"
"io/ioutil"
"net"
"net/http"
"net/url"
"os"
+ "strings"
"testing"
"time"
@@ -273,3 +275,86 @@ func (th *testHandler) CheckHealth() error {
}
return nil
}
+
+func (*Suite) TestAutoReloadConfig(c *check.C) {
+ host := "127.0.0.1"
+ ln, err := net.Listen("tcp", net.JoinHostPort(host, "0"))
+ c.Assert(err, check.IsNil)
+ hostport := ln.Addr().String()
+ ln.Close()
+
+ cf, err := ioutil.TempFile("", "cmd_test.")
+ c.Assert(err, check.IsNil)
+ defer os.Remove(cf.Name())
+ defer cf.Close()
+ fmt.Fprintf(cf, `
+AutoReloadConfig: true
+Clusters:
+ zzzzz:
+ SystemRootToken: abcde
+ ManagementToken: firsttoken
+ Services:
+ Controller:
+ InternalURLs:
+ "http://`+hostport+`": {}
+ ExternalURL: https://zzzzz.example.com/
+`)
+ cf.Close()
+
+ serviceStartedWithToken := make(chan string)
+ serviceContextClosed := make(chan bool)
+ healthCheck := make(chan bool, 1)
+ cmd := Command(arvados.ServiceNameController, func(ctx context.Context, cluster *arvados.Cluster, token string, reg *prometheus.Registry) Handler {
+ serviceStartedWithToken <- cluster.ManagementToken
+ go func() {
+ <-ctx.Done()
+ serviceContextClosed <- true
+ }()
+ return &testHandler{ctx: ctx, healthCheck: healthCheck, handler: http.NotFoundHandler()}
+ })
+
+ testFinished := false
+ defer func() { testFinished = true }()
+ var stdin, stdout, stderr bytes.Buffer
+ go func() {
+ cmd.RunCommand("arvados-controller", []string{"-config", cf.Name()}, &stdin, &stdout, io.MultiWriter(&stderr, ctxlog.LogWriter(c.Log)))
+ if !testFinished {
+ panic("command exited before test finished")
+ }
+ }()
+
+ c.Check(<-serviceStartedWithToken, check.Equals, "firsttoken")
+ <-healthCheck
+ os.WriteFile(cf.Name()+".2", []byte(`Clusters: {zzzzz: {oops`), 0777)
+ os.Rename(cf.Name()+".2", cf.Name())
+ for deadline := time.Now().Add(time.Second); ; time.Sleep(time.Millisecond) {
+ if time.Now().After(deadline) {
+ c.Log("timed out waiting for warning about bad config file")
+ c.FailNow()
+ }
+ if strings.Contains(stderr.String(), "error reloading config file") {
+ break
+ }
+ }
+
+ os.WriteFile(cf.Name(), []byte(`
+AutoReloadConfig: true
+Clusters:
+ zzzzz:
+ SystemRootToken: abcde
+ ManagementToken: secondtoken
+ Services:
+ Controller:
+ InternalURLs:
+ "http://`+hostport+`": {}
+ ExternalURL: https://zzzzz.example.com/
+`), 0777)
+ <-serviceContextClosed
+ c.Check(<-serviceStartedWithToken, check.Equals, "secondtoken")
+ <-healthCheck
+ // Check that the new/restarted service successfully listens
+ // on the same/reused port that the first one was using.
+ resp, err := http.Get("http://" + hostport + "/")
+ c.Check(err, check.IsNil)
+ c.Check(resp.StatusCode, check.Equals, http.StatusNotFound)
+}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list