[arvados] created: 2.6.0-112-ge3f543331

git repository hosting git at public.arvados.org
Wed May 3 20:42:04 UTC 2023

        at  e3f543331190073de81c52be3e0ba0a427e3c4dc (commit)

commit e3f543331190073de81c52be3e0ba0a427e3c4dc
Author: Tom Clegg <tom at curii.com>
Date:   Wed May 3 16:40:59 2023 -0400

    20475: Option to dump active requests when queue is >=90% full.
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml
index cf9406b2b..8203a94de 100644
--- a/lib/config/config.default.yml
+++ b/lib/config/config.default.yml
@@ -442,6 +442,15 @@ Clusters:
       # params_truncated.
       MaxRequestLogParamsSize: 2000
+      # In all services except RailsAPI, periodically check whether
+      # the incoming HTTP request queue is nearly full (see
+      # MaxConcurrentRequests) and, if so, write a snapshot of the
+      # request queue to {service}-requests.json in the specified
+      # directory.
+      #
+      # Leave blank to disable.
+      RequestQueueDumpDirectory: ""
       # Enable access controls for data stored in Keep. This should
diff --git a/lib/service/cmd.go b/lib/service/cmd.go
index 20441c2a6..cc6938cbc 100644
--- a/lib/service/cmd.go
+++ b/lib/service/cmd.go
@@ -12,10 +12,12 @@ import (
+	"net/http/httptest"
 	_ "net/http/pprof"
+	"time"
@@ -45,6 +47,8 @@ type command struct {
 	ctx        context.Context // enables tests to shutdown service; no public API yet
+var requestQueueDumpCheckInterval = time.Minute
 // Command returns a cmd.Handler that loads site config, calls
 // newHandler with the current cluster and node configs, and brings up
 // an http server with the returned handler.
@@ -189,6 +193,7 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
+	go c.requestQueueDumpCheck(cluster, prog, reg, &srv.Server, logger)
 	err = srv.Wait()
 	if err != nil {
 		return 1
@@ -196,6 +201,55 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
 	return 0
+// If SystemLogs.RequestQueueDumpDirectory is set, monitor the
+// server's incoming HTTP request queue size. When it exceeds 90% of
+// API.MaxConcurrentRequests, write the /_inspect/requests data to a
+// JSON file in the specified directory.
+func (c *command) requestQueueDumpCheck(cluster *arvados.Cluster, prog string, reg *prometheus.Registry, srv *http.Server, logger logrus.FieldLogger) {
+	outdir := cluster.SystemLogs.RequestQueueDumpDirectory
+	if outdir == "" || cluster.ManagementToken == "" {
+		return
+	}
+	logger = logger.WithField("worker", "RequestQueueDump")
+	outfile := outdir + "/" + prog + "-requests.json"
+	for range time.NewTicker(requestQueueDumpCheckInterval).C {
+		mfs, err := reg.Gather()
+		if err != nil {
+			logger.WithError(err).Warn("error getting metrics")
+			continue
+		}
+		dump := false
+		for _, mf := range mfs {
+			if mf.Name != nil && *mf.Name == "arvados_concurrent_requests" && len(mf.Metric) == 1 {
+				n := int(mf.Metric[0].GetGauge().GetValue())
+				if n > 0 && n >= cluster.API.MaxConcurrentRequests*9/10 {
+					dump = true
+					break
+				}
+			}
+		}
+		if dump {
+			req, err := http.NewRequest("GET", "/_inspect/requests", nil)
+			if err != nil {
+				logger.WithError(err).Warn("error in http.NewRequest")
+				continue
+			}
+			req.Header.Set("Authorization", "Bearer "+cluster.ManagementToken)
+			resp := httptest.NewRecorder()
+			srv.Handler.ServeHTTP(resp, req)
+			if code := resp.Result().StatusCode; code != http.StatusOK {
+				logger.WithField("StatusCode", code).Warn("error getting /_inspect/requests")
+				continue
+			}
+			err = os.WriteFile(outfile, resp.Body.Bytes(), 0777)
+			if err != nil {
+				logger.WithError(err).Warn("error writing file")
+				continue
+			}
+		}
+	}
 // If an incoming request's target vhost has an embedded collection
 // UUID or PDH, handle it with hTrue, otherwise handle it with
 // hFalse.
diff --git a/lib/service/cmd_test.go b/lib/service/cmd_test.go
index 7db910927..22b841513 100644
--- a/lib/service/cmd_test.go
+++ b/lib/service/cmd_test.go
@@ -9,12 +9,14 @@ import (
+	"encoding/json"
+	"strings"
@@ -192,6 +194,102 @@ func (*Suite) TestCommand(c *check.C) {
 	c.Check(stderr.String(), check.Matches, `(?ms).*"msg":"CheckHealth called".*`)
+func (*Suite) TestDumpRequests(c *check.C) {
+	defer func(orig time.Duration) { requestQueueDumpCheckInterval = orig }(requestQueueDumpCheckInterval)
+	requestQueueDumpCheckInterval = time.Second / 10
+	tmpdir := c.MkDir()
+	cf, err := ioutil.TempFile(tmpdir, "cmd_test.")
+	c.Assert(err, check.IsNil)
+	defer os.Remove(cf.Name())
+	defer cf.Close()
+	max := 1
+	fmt.Fprintf(cf, `
+ zzzzz:
+  SystemRootToken: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+  ManagementToken: bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb
+  API: {MaxConcurrentRequests: %d}
+  SystemLogs: {RequestQueueDumpDirectory: %q}
+  Services:
+   Controller:
+    ExternalURL: "http://localhost:12345"
+    InternalURLs: {"http://localhost:12345": {}}
+`, max, tmpdir)
+	started := make(chan bool, max+1)
+	hold := make(chan bool)
+	handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+		started <- true
+		<-hold
+	})
+	healthCheck := make(chan bool, 1)
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+	cmd := Command(arvados.ServiceNameController, func(ctx context.Context, _ *arvados.Cluster, token string, reg *prometheus.Registry) Handler {
+		return &testHandler{ctx: ctx, handler: handler, healthCheck: healthCheck}
+	})
+	cmd.(*command).ctx = context.WithValue(ctx, contextKey, "bar")
+	exited := make(chan bool)
+	var stdin, stdout, stderr bytes.Buffer
+	go func() {
+		cmd.RunCommand("arvados-controller", []string{"-config", cf.Name()}, &stdin, &stdout, &stderr)
+		close(exited)
+	}()
+	select {
+	case <-healthCheck:
+	case <-exited:
+		c.Logf("%s", stderr.String())
+		c.Error("command exited without health check")
+	}
+	client := http.Client{}
+	deadline := time.Now().Add(time.Second * 2)
+	for i := 0; i < max+1; i++ {
+		go func() {
+			resp, err := client.Get("http://localhost:12345/testpath")
+			for err != nil && strings.Contains(err.Error(), "dial tcp") && deadline.After(time.Now()) {
+				time.Sleep(time.Second / 100)
+				resp, err = client.Get("http://localhost:12345/testpath")
+			}
+			if c.Check(err, check.IsNil) {
+				c.Logf("resp StatusCode %d", resp.StatusCode)
+			}
+		}()
+	}
+	for i := 0; i < max; i++ {
+		select {
+		case <-started:
+		case <-time.After(time.Second):
+			c.Logf("%s", stderr.String())
+			panic("timed out")
+		}
+	}
+	for {
+		j, err := os.ReadFile(tmpdir + "/arvados-controller-requests.json")
+		if os.IsNotExist(err) && deadline.After(time.Now()) {
+			time.Sleep(time.Second / 100)
+			continue
+		}
+		c.Check(err, check.IsNil)
+		c.Logf("%s", stderr.String())
+		c.Logf("%s", string(j))
+		var loaded []struct{ URL string }
+		err = json.Unmarshal(j, &loaded)
+		c.Check(err, check.IsNil)
+		c.Check(loaded, check.HasLen, max)
+		c.Check(loaded[0].URL, check.Equals, "/testpath")
+		break
+	}
+	close(hold)
+	cancel()
 func (*Suite) TestTLS(c *check.C) {
 	cwd, err := os.Getwd()
 	c.Assert(err, check.IsNil)
diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go
index 01e9902c8..ee0e80513 100644
--- a/sdk/go/arvados/config.go
+++ b/sdk/go/arvados/config.go
@@ -221,9 +221,10 @@ type Cluster struct {
 		EmailFrom                      string
 	SystemLogs struct {
-		LogLevel                string
-		Format                  string
-		MaxRequestLogParamsSize int
+		LogLevel                  string
+		Format                    string
+		MaxRequestLogParamsSize   int
+		RequestQueueDumpDirectory string
 	TLS struct {
 		Certificate string



More information about the arvados-commits mailing list