[arvados] created: 2.5.0-1-g194bd77e0

git repository hosting git at public.arvados.org
Thu Dec 22 21:42:04 UTC 2022

        at  194bd77e028a9082e6f252974fcee2dfbaed305f (commit)

commit 194bd77e028a9082e6f252974fcee2dfbaed305f
Author: Tom Clegg <tom at curii.com>
Date:   Thu Dec 22 16:39:03 2022 -0500

    19564: Add AutoReloadConfig support for individual services.
    Previously, AutoReloadConfig only applied to services running under
    "arvados-server boot".
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/lib/boot/supervisor.go b/lib/boot/supervisor.go
index 0f0600f18..3b4602114 100644
--- a/lib/boot/supervisor.go
+++ b/lib/boot/supervisor.go
@@ -20,7 +20,6 @@ import (
-	"reflect"
@@ -32,7 +31,6 @@ import (
-	"github.com/fsnotify/fsnotify"
@@ -153,13 +151,17 @@ func (super *Supervisor) Start(ctx context.Context) {
-	if super.ConfigPath != "" && super.ConfigPath != "-" && cfg.AutoReloadConfig {
-		go watchConfig(super.ctx, super.logger, super.ConfigPath, copyConfig(cfg), func() {
-			if super.err == nil {
-				super.err = errNeedConfigReload
-			}
-			super.cancel()
-		})
+	err = loader.WatchConfig(super.ctx, func() {
+		if super.err == nil {
+			super.err = errNeedConfigReload
+		}
+		super.cancel()
+	})
+	if err != nil {
+		super.err = err
+		close(super.done)
+		super.cancel()
+		return
 	if len(cfg.Clusters) > 1 {
@@ -288,6 +290,7 @@ func (super *Supervisor) runCluster() error {
 	defer conffile.Close()
 	err = json.NewEncoder(conffile).Encode(arvados.Config{
+		AutoReloadConfig: false,
 		Clusters: map[string]arvados.Cluster{
 			super.cluster.ClusterID: *super.cluster}})
 	if err != nil {
@@ -1031,67 +1034,3 @@ func waitForConnect(ctx context.Context, addr string) error {
 	return ctx.Err()
-func copyConfig(cfg *arvados.Config) *arvados.Config {
-	pr, pw := io.Pipe()
-	go func() {
-		err := json.NewEncoder(pw).Encode(cfg)
-		if err != nil {
-			panic(err)
-		}
-		pw.Close()
-	}()
-	cfg2 := new(arvados.Config)
-	err := json.NewDecoder(pr).Decode(cfg2)
-	if err != nil {
-		panic(err)
-	}
-	return cfg2
-func watchConfig(ctx context.Context, logger logrus.FieldLogger, cfgPath string, prevcfg *arvados.Config, fn func()) {
-	watcher, err := fsnotify.NewWatcher()
-	if err != nil {
-		logger.WithError(err).Error("fsnotify setup failed")
-		return
-	}
-	defer watcher.Close()
-	err = watcher.Add(cfgPath)
-	if err != nil {
-		logger.WithError(err).Error("fsnotify watcher failed")
-		return
-	}
-	for {
-		select {
-		case <-ctx.Done():
-			return
-		case err, ok := <-watcher.Errors:
-			if !ok {
-				return
-			}
-			logger.WithError(err).Warn("fsnotify watcher reported error")
-		case _, ok := <-watcher.Events:
-			if !ok {
-				return
-			}
-			for len(watcher.Events) > 0 {
-				<-watcher.Events
-			}
-			loader := config.NewLoader(&bytes.Buffer{}, &logrus.Logger{Out: ioutil.Discard})
-			loader.Path = cfgPath
-			loader.SkipAPICalls = true
-			cfg, err := loader.Load()
-			if err != nil {
-				logger.WithError(err).Warn("error reloading config file after change detected; ignoring new config for now")
-			} else if reflect.DeepEqual(cfg, prevcfg) {
-				logger.Debug("config file changed but is still DeepEqual to the existing config")
-			} else {
-				logger.Debug("config changed, notifying supervisor")
-				fn()
-				prevcfg = cfg
-			}
-		}
-	}
diff --git a/lib/config/load.go b/lib/config/load.go
index 9269ddf27..4ded8aa1a 100644
--- a/lib/config/load.go
+++ b/lib/config/load.go
@@ -198,6 +198,10 @@ func (ldr *Loader) loadBytes(path string) (buf []byte, sourceTime, loadTime time
+func (ldr *Loader) Unload() {
+	ldr.configdata = nil
 func (ldr *Loader) Load() (*arvados.Config, error) {
 	if ldr.configdata == nil {
 		buf, sourceTime, loadTime, err := ldr.loadBytes(ldr.Path)
diff --git a/lib/config/watch.go b/lib/config/watch.go
new file mode 100644
index 000000000..3d6c6d7fc
--- /dev/null
+++ b/lib/config/watch.go
@@ -0,0 +1,142 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+// SPDX-License-Identifier: Apache-2.0
+package config
+import (
+	"bytes"
+	"context"
+	"encoding/json"
+	"fmt"
+	"io"
+	"io/ioutil"
+	_ "net/http/pprof"
+	"reflect"
+	"time"
+	"git.arvados.org/arvados.git/sdk/go/arvados"
+	"github.com/fsnotify/fsnotify"
+	"github.com/sirupsen/logrus"
+// WatchConfig starts a goroutine that calls fn when the config file
+// changes in such a way that the new content is loadable and
+// semantically different from the previous config.
+// After fn is called, the next call to Load() will return the updated
+// config.
+// If cfgPath is empty or "-", or the config has AutoReloadConfig
+// turned off, then WatchConfig does nothing.
+func (ldr *Loader) WatchConfig(ctx context.Context, fn func()) error {
+	if ldr.Path == "" || ldr.Path == "-" {
+		ldr.Logger.Debugf("AutoReloadConfig is disabled because config %q is not a regular file", ldr.Path)
+		return nil
+	}
+	cfg, err := ldr.Load()
+	if err != nil {
+		return err
+	}
+	if !cfg.AutoReloadConfig {
+		ldr.Logger.Debug("AutoReloadConfig is disabled, not watching config")
+		return nil
+	}
+	var copyerr error
+	pr, pw := io.Pipe()
+	go func() {
+		err := json.NewEncoder(pw).Encode(cfg)
+		if err != nil {
+			copyerr = err
+		}
+		err = pw.Close()
+		if copyerr == nil {
+			copyerr = err
+		}
+	}()
+	cfg2 := new(arvados.Config)
+	err = json.NewDecoder(pr).Decode(cfg2)
+	if err != nil {
+		return fmt.Errorf("error copying config: %w", err)
+	}
+	err = pr.Close()
+	if err == nil {
+		err = copyerr
+	}
+	if err != nil {
+		return fmt.Errorf("error copying config: %w", err)
+	}
+	watcher, err := fsnotify.NewWatcher()
+	if err != nil {
+		return fmt.Errorf("fsnotify setup failed: %w", err)
+	}
+	go watchConfig(ctx, ldr.Logger, ldr.Path, cfg2, watcher, func(configdata []byte) {
+		ldr.configdata = configdata
+		fn()
+	})
+	return nil
+func watchConfig(ctx context.Context, logger logrus.FieldLogger, cfgPath string, prevcfg *arvados.Config, watcher *fsnotify.Watcher, fn func([]byte)) {
+	defer watcher.Close()
+	rewatch := func() {
+		watcher.Remove(cfgPath)
+		for delay := time.Second / 10; ; {
+			err := watcher.Add(cfgPath)
+			if err != nil {
+				logger.WithError(err).WithField("file", cfgPath).Warn("fsnotify watch failed")
+				time.Sleep(delay)
+				if delay < time.Minute {
+					delay = delay * 2
+				}
+				continue
+			}
+			break
+		}
+	}
+	rewatch()
+	for {
+		select {
+		case <-ctx.Done():
+			return
+		case err, ok := <-watcher.Errors:
+			if !ok {
+				return
+			}
+			logger.WithError(err).Warn("fsnotify watcher reported error")
+		case _, ok := <-watcher.Events:
+			if !ok {
+				return
+			}
+			for len(watcher.Events) > 0 {
+				<-watcher.Events
+			}
+			// We remove and re-add our watcher here so
+			// that, if someone renames a new config file
+			// into place (as they should), we receive
+			// events about the *new* file.
+			//
+			// Setting up the watcher here (before reading
+			// the new file) ensures we will get notified
+			// in the next loop iteration if the new file
+			// is changed or replaced before we even
+			// finish reading it.
+			rewatch()
+			loader := NewLoader(&bytes.Buffer{}, &logrus.Logger{Out: ioutil.Discard})
+			loader.Path = cfgPath
+			loader.SkipAPICalls = true
+			cfg, err := loader.Load()
+			if err != nil {
+				logger.WithError(err).Warn("error reloading config file after change detected; ignoring new config for now")
+			} else if reflect.DeepEqual(cfg, prevcfg) {
+				logger.Debug("config file changed but is still DeepEqual to the existing config")
+			} else {
+				logger.Debug("config changed, notifying")
+				fn(loader.configdata)
+				prevcfg = cfg
+			}
+		}
+	}
diff --git a/lib/service/cmd.go b/lib/service/cmd.go
index 20441c2a6..e950f05c8 100644
--- a/lib/service/cmd.go
+++ b/lib/service/cmd.go
@@ -103,33 +103,63 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
 		loader.SkipAPICalls = true
-	cfg, err := loader.Load()
-	if err != nil {
-		return 1
-	}
-	cluster, err := cfg.GetCluster("")
+	wantReload := make(chan struct{}, 1)
+	err = loader.WatchConfig(c.ctx, func() {
+		wantReload <- struct{}{}
+	})
 	if err != nil {
+		log.WithError(err).Error("exiting")
 		return 1
-	// Now that we've read the config, replace the bootstrap
-	// logger with a new one according to the logging config.
-	log = ctxlog.New(stderr, cluster.SystemLogs.Format, cluster.SystemLogs.LogLevel)
-	logger := log.WithFields(logrus.Fields{
-		"PID":       os.Getpid(),
-		"ClusterID": cluster.ClusterID,
-	})
-	ctx := ctxlog.Context(c.ctx, logger)
+	for {
+		cfg, err := loader.Load()
+		if err != nil {
+			log.WithError(err).Error("exiting")
+			return 1
+		}
+		cluster, err := cfg.GetCluster("")
+		if err != nil {
+			log.WithError(err).Error("exiting")
+			return 1
+		}
+		// Now that we've read the config, replace the bootstrap
+		// logger with a new one according to the logging config.
+		baseLogger := ctxlog.New(stderr, cluster.SystemLogs.Format, cluster.SystemLogs.LogLevel)
+		logger := baseLogger.WithFields(logrus.Fields{
+			"PID":       os.Getpid(),
+			"ClusterID": cluster.ClusterID,
+		})
+		ctx := ctxlog.Context(c.ctx, logger)
+		loader.Logger = logger
+		reg := prometheus.NewRegistry()
+		loader.RegisterMetrics(reg)
-	listenURL, internalURL, err := getListenAddr(cluster.Services, c.svcName, log)
+		ctx, cancel := context.WithCancel(ctx)
+		defer cancel()
+		go func() {
+			<-wantReload
+			cancel()
+		}()
+		err = c.run(ctx, cluster, baseLogger, logger, reg)
+		if err != nil && ctx.Err() == nil {
+			// run returned an error, and we aren't trying
+			// to cancel/reload -- something went wrong
+			log.WithError(err).Error("exiting")
+			return 1
+		}
+	}
+func (c *command) run(ctx context.Context, cluster *arvados.Cluster, baseLogger *logrus.Logger, logger logrus.FieldLogger, reg *prometheus.Registry) error {
+	listenURL, internalURL, err := getListenAddr(cluster.Services, c.svcName, logger)
 	if err != nil {
-		return 1
+		return err
 	ctx = context.WithValue(ctx, contextKeyURL{}, internalURL)
-	reg := prometheus.NewRegistry()
-	loader.RegisterMetrics(reg)
 	// arvados_version_running{version="1.2.3~4"} 1.0
 	mVersion := prometheus.NewGaugeVec(prometheus.GaugeOpts{
 		Namespace: "arvados",
@@ -140,11 +170,11 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
 	handler := c.newHandler(ctx, cluster, cluster.SystemRootToken, reg)
-	if err = handler.CheckHealth(); err != nil {
-		return 1
+	if err := handler.CheckHealth(); err != nil {
+		return err
-	instrumented := httpserver.Instrument(reg, log,
+	instrumented := httpserver.Instrument(reg, baseLogger,
 				httpserver.Inspect(reg, cluster.ManagementToken,
@@ -162,13 +192,13 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
 		tlsconfig, err := makeTLSConfig(cluster, logger)
 		if err != nil {
 			logger.WithError(err).Errorf("cannot start %s service on %s", c.svcName, listenURL.String())
-			return 1
+			return err
 		srv.TLSConfig = tlsconfig
 	err = srv.Start()
 	if err != nil {
-		return 1
+		return err
 		"URL":     listenURL,
@@ -191,9 +221,9 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
 	err = srv.Wait()
 	if err != nil {
-		return 1
+		return err
-	return 0
+	return nil
 // If an incoming request's target vhost has an embedded collection
diff --git a/lib/service/cmd_test.go b/lib/service/cmd_test.go
index 7db910927..4e145f8d5 100644
--- a/lib/service/cmd_test.go
+++ b/lib/service/cmd_test.go
@@ -10,11 +10,13 @@ import (
+	"io"
+	"strings"
@@ -273,3 +275,86 @@ func (th *testHandler) CheckHealth() error {
 	return nil
+func (*Suite) TestAutoReloadConfig(c *check.C) {
+	host := ""
+	ln, err := net.Listen("tcp", net.JoinHostPort(host, "0"))
+	c.Assert(err, check.IsNil)
+	hostport := ln.Addr().String()
+	ln.Close()
+	cf, err := ioutil.TempFile("", "cmd_test.")
+	c.Assert(err, check.IsNil)
+	defer os.Remove(cf.Name())
+	defer cf.Close()
+	fmt.Fprintf(cf, `
+AutoReloadConfig: true
+ zzzzz:
+  SystemRootToken: abcde
+  ManagementToken: firsttoken
+  Services:
+   Controller:
+    InternalURLs:
+     "http://`+hostport+`": {}
+    ExternalURL: https://zzzzz.example.com/
+	cf.Close()
+	serviceStartedWithToken := make(chan string)
+	serviceContextClosed := make(chan bool)
+	healthCheck := make(chan bool, 1)
+	cmd := Command(arvados.ServiceNameController, func(ctx context.Context, cluster *arvados.Cluster, token string, reg *prometheus.Registry) Handler {
+		serviceStartedWithToken <- cluster.ManagementToken
+		go func() {
+			<-ctx.Done()
+			serviceContextClosed <- true
+		}()
+		return &testHandler{ctx: ctx, healthCheck: healthCheck, handler: http.NotFoundHandler()}
+	})
+	testFinished := false
+	defer func() { testFinished = true }()
+	var stdin, stdout, stderr bytes.Buffer
+	go func() {
+		cmd.RunCommand("arvados-controller", []string{"-config", cf.Name()}, &stdin, &stdout, io.MultiWriter(&stderr, ctxlog.LogWriter(c.Log)))
+		if !testFinished {
+			panic("command exited before test finished")
+		}
+	}()
+	c.Check(<-serviceStartedWithToken, check.Equals, "firsttoken")
+	<-healthCheck
+	os.WriteFile(cf.Name()+".2", []byte(`Clusters: {zzzzz: {oops`), 0777)
+	os.Rename(cf.Name()+".2", cf.Name())
+	for deadline := time.Now().Add(time.Second); ; time.Sleep(time.Millisecond) {
+		if time.Now().After(deadline) {
+			c.Log("timed out waiting for warning about bad config file")
+			c.FailNow()
+		}
+		if strings.Contains(stderr.String(), "error reloading config file") {
+			break
+		}
+	}
+	os.WriteFile(cf.Name(), []byte(`
+AutoReloadConfig: true
+ zzzzz:
+  SystemRootToken: abcde
+  ManagementToken: secondtoken
+  Services:
+   Controller:
+    InternalURLs:
+     "http://`+hostport+`": {}
+    ExternalURL: https://zzzzz.example.com/
+`), 0777)
+	<-serviceContextClosed
+	c.Check(<-serviceStartedWithToken, check.Equals, "secondtoken")
+	<-healthCheck
+	// Check that the new/restarted service successfully listens
+	// on the same/reused port that the first one was using.
+	resp, err := http.Get("http://" + hostport + "/")
+	c.Check(err, check.IsNil)
+	c.Check(resp.StatusCode, check.Equals, http.StatusNotFound)



More information about the arvados-commits mailing list