[arvados] created: 2.1.0-2996-g08bf53a33

git repository hosting git at public.arvados.org
Fri Oct 28 17:51:00 UTC 2022


        at  08bf53a3396ab74e805d468ccbb9c0cea86a3d5a (commit)


commit 08bf53a3396ab74e805d468ccbb9c0cea86a3d5a
Author: Tom Clegg <tom at curii.com>
Date:   Fri Oct 28 11:08:39 2022 -0400

    18071: Improve test timeout.
    
    Test can take longer when output is going to a terminal. Keep trying
    if some progress is made every 3s, rather than giving up after 10s
    total.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/lib/dispatchcloud/dispatcher_test.go b/lib/dispatchcloud/dispatcher_test.go
index bdbdf1ef9..2d486da5f 100644
--- a/lib/dispatchcloud/dispatcher_test.go
+++ b/lib/dispatchcloud/dispatcher_test.go
@@ -193,12 +193,18 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
 	err := s.disp.CheckHealth()
 	c.Check(err, check.IsNil)
 
-	select {
-	case <-done:
-		c.Logf("containers finished (%s), waiting for instances to shutdown and queue to clear", time.Since(start))
-	case <-time.After(10 * time.Second):
-		c.Fatalf("timed out; still waiting for %d containers: %q", len(waiting), waiting)
+	for len(waiting) > 0 {
+		waswaiting := len(waiting)
+		select {
+		case <-done:
+			// loop will end because len(waiting)==0
+		case <-time.After(3 * time.Second):
+			if len(waiting) >= waswaiting {
+				c.Fatalf("timed out; no progress in 3s while waiting for %d containers: %q", len(waiting), waiting)
+			}
+		}
 	}
+	c.Logf("containers finished (%s), waiting for instances to shutdown and queue to clear", time.Since(start))
 
 	deadline := time.Now().Add(5 * time.Second)
 	for range time.NewTicker(10 * time.Millisecond).C {

commit 65b12213f740b117fb14822bce0dbb415257c355
Author: Tom Clegg <tom at curii.com>
Date:   Fri Oct 28 10:35:27 2022 -0400

    18071: Use dblock to prevent multiple dispatchers from competing.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/lib/controller/dblock/dblock.go b/lib/controller/dblock/dblock.go
index 6f5db1006..ad2733abf 100644
--- a/lib/controller/dblock/dblock.go
+++ b/lib/controller/dblock/dblock.go
@@ -21,6 +21,7 @@ var (
 	ContainerLogSweep  = &DBLocker{key: 10002}
 	KeepBalanceService = &DBLocker{key: 10003} // keep-balance service in periodic-sweep loop
 	KeepBalanceActive  = &DBLocker{key: 10004} // keep-balance sweep in progress (either -once=true or service loop)
+	Dispatch           = &DBLocker{key: 10005} // any dispatcher running
 	retryDelay         = 5 * time.Second
 )
 
diff --git a/lib/controller/federation.go b/lib/controller/federation.go
index e7d6e29b8..93b8315a6 100644
--- a/lib/controller/federation.go
+++ b/lib/controller/federation.go
@@ -142,7 +142,7 @@ type CurrentUser struct {
 // non-nil, true, nil -- if the token is valid
 func (h *Handler) validateAPItoken(req *http.Request, token string) (*CurrentUser, bool, error) {
 	user := CurrentUser{Authorization: arvados.APIClientAuthorization{APIToken: token}}
-	db, err := h.db(req.Context())
+	db, err := h.dbConnector.GetDB(req.Context())
 	if err != nil {
 		ctxlog.FromContext(req.Context()).WithError(err).Debugf("validateAPItoken(%s): database error", token)
 		return nil, false, err
@@ -179,7 +179,7 @@ func (h *Handler) validateAPItoken(req *http.Request, token string) (*CurrentUse
 }
 
 func (h *Handler) createAPItoken(req *http.Request, userUUID string, scopes []string) (*arvados.APIClientAuthorization, error) {
-	db, err := h.db(req.Context())
+	db, err := h.dbConnector.GetDB(req.Context())
 	if err != nil {
 		return nil, err
 	}
diff --git a/lib/controller/handler.go b/lib/controller/handler.go
index e1392bef9..4c6fca7f7 100644
--- a/lib/controller/handler.go
+++ b/lib/controller/handler.go
@@ -6,7 +6,6 @@ package controller
 
 import (
 	"context"
-	"errors"
 	"fmt"
 	"net/http"
 	"net/http/httptest"
@@ -21,10 +20,8 @@ import (
 	"git.arvados.org/arvados.git/lib/controller/router"
 	"git.arvados.org/arvados.git/lib/ctrlctx"
 	"git.arvados.org/arvados.git/sdk/go/arvados"
-	"git.arvados.org/arvados.git/sdk/go/ctxlog"
 	"git.arvados.org/arvados.git/sdk/go/health"
 	"git.arvados.org/arvados.git/sdk/go/httpserver"
-	"github.com/jmoiron/sqlx"
 
 	// sqlx needs lib/pq to talk to PostgreSQL
 	_ "github.com/lib/pq"
@@ -40,8 +37,7 @@ type Handler struct {
 	proxy          *proxy
 	secureClient   *http.Client
 	insecureClient *http.Client
-	pgdb           *sqlx.DB
-	pgdbMtx        sync.Mutex
+	dbConnector    ctrlctx.DBConnector
 }
 
 func (h *Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
@@ -65,7 +61,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
 
 func (h *Handler) CheckHealth() error {
 	h.setupOnce.Do(h.setup)
-	_, err := h.db(context.TODO())
+	_, err := h.dbConnector.GetDB(context.TODO())
 	if err != nil {
 		return err
 	}
@@ -97,17 +93,18 @@ func (h *Handler) setup() {
 	mux := http.NewServeMux()
 	healthFuncs := make(map[string]health.Func)
 
-	oidcAuthorizer := localdb.OIDCAccessTokenAuthorizer(h.Cluster, h.db)
+	h.dbConnector = ctrlctx.DBConnector{PostgreSQL: h.Cluster.PostgreSQL}
+	oidcAuthorizer := localdb.OIDCAccessTokenAuthorizer(h.Cluster, h.dbConnector.GetDB)
 	h.federation = federation.New(h.Cluster, &healthFuncs)
 	rtr := router.New(h.federation, router.Config{
 		MaxRequestSize: h.Cluster.API.MaxRequestSize,
 		WrapCalls: api.ComposeWrappers(
-			ctrlctx.WrapCallsInTransactions(h.db),
+			ctrlctx.WrapCallsInTransactions(h.dbConnector.GetDB),
 			oidcAuthorizer.WrapCalls,
 			ctrlctx.WrapCallsWithAuth(h.Cluster)),
 	})
 
-	healthRoutes := health.Routes{"ping": func() error { _, err := h.db(context.TODO()); return err }}
+	healthRoutes := health.Routes{"ping": func() error { _, err := h.dbConnector.GetDB(context.TODO()); return err }}
 	for name, f := range healthFuncs {
 		healthRoutes[name] = f
 	}
@@ -158,31 +155,6 @@ func (h *Handler) setup() {
 	go h.containerLogSweepWorker()
 }
 
-var errDBConnection = errors.New("database connection error")
-
-func (h *Handler) db(ctx context.Context) (*sqlx.DB, error) {
-	h.pgdbMtx.Lock()
-	defer h.pgdbMtx.Unlock()
-	if h.pgdb != nil {
-		return h.pgdb, nil
-	}
-
-	db, err := sqlx.Open("postgres", h.Cluster.PostgreSQL.Connection.String())
-	if err != nil {
-		ctxlog.FromContext(ctx).WithError(err).Error("postgresql connect failed")
-		return nil, errDBConnection
-	}
-	if p := h.Cluster.PostgreSQL.ConnectionPool; p > 0 {
-		db.SetMaxOpenConns(p)
-	}
-	if err := db.Ping(); err != nil {
-		ctxlog.FromContext(ctx).WithError(err).Error("postgresql connect succeeded but ping failed")
-		return nil, errDBConnection
-	}
-	h.pgdb = db
-	return db, nil
-}
-
 type middlewareFunc func(http.ResponseWriter, *http.Request, http.Handler)
 
 func prepend(next http.Handler, middleware middlewareFunc) http.Handler {
diff --git a/lib/controller/handler_test.go b/lib/controller/handler_test.go
index 0ffe0255f..c9999fa28 100644
--- a/lib/controller/handler_test.go
+++ b/lib/controller/handler_test.go
@@ -477,7 +477,7 @@ func (s *HandlerSuite) TestTrashSweep(c *check.C) {
 	coll, err := s.handler.federation.CollectionCreate(ctx, arvados.CreateOptions{Attrs: map[string]interface{}{"name": "test trash sweep"}, EnsureUniqueName: true})
 	c.Assert(err, check.IsNil)
 	defer s.handler.federation.CollectionDelete(ctx, arvados.DeleteOptions{UUID: coll.UUID})
-	db, err := s.handler.db(s.ctx)
+	db, err := s.handler.dbConnector.GetDB(s.ctx)
 	c.Assert(err, check.IsNil)
 	_, err = db.ExecContext(s.ctx, `update collections set trash_at = $1, delete_at = $2 where uuid = $3`, time.Now().UTC().Add(time.Second/10), time.Now().UTC().Add(time.Hour), coll.UUID)
 	c.Assert(err, check.IsNil)
@@ -550,7 +550,7 @@ func (s *HandlerSuite) TestLogActivity(c *check.C) {
 			c.Assert(err, check.IsNil)
 		}
 	}
-	db, err := s.handler.db(s.ctx)
+	db, err := s.handler.dbConnector.GetDB(s.ctx)
 	c.Assert(err, check.IsNil)
 	for _, userUUID := range []string{arvadostest.ActiveUserUUID, arvadostest.SpectatorUserUUID} {
 		var rows int
diff --git a/lib/controller/trash.go b/lib/controller/trash.go
index afdf95b78..99e7aec0b 100644
--- a/lib/controller/trash.go
+++ b/lib/controller/trash.go
@@ -20,7 +20,7 @@ func (h *Handler) periodicWorker(workerName string, interval time.Duration, lock
 		logger.Debugf("interval is %v, not running worker", interval)
 		return
 	}
-	if !locker.Lock(ctx, h.db) {
+	if !locker.Lock(ctx, h.dbConnector.GetDB) {
 		// context canceled
 		return
 	}
@@ -47,7 +47,7 @@ func (h *Handler) trashSweepWorker() {
 
 func (h *Handler) containerLogSweepWorker() {
 	h.periodicWorker("container log sweep", h.Cluster.Containers.Logging.SweepInterval.Duration(), dblock.ContainerLogSweep, func(ctx context.Context) error {
-		db, err := h.db(ctx)
+		db, err := h.dbConnector.GetDB(ctx)
 		if err != nil {
 			return err
 		}
diff --git a/lib/ctrlctx/db.go b/lib/ctrlctx/db.go
index a76420860..2a05096ce 100644
--- a/lib/ctrlctx/db.go
+++ b/lib/ctrlctx/db.go
@@ -10,6 +10,7 @@ import (
 	"sync"
 
 	"git.arvados.org/arvados.git/lib/controller/api"
+	"git.arvados.org/arvados.git/sdk/go/arvados"
 	"git.arvados.org/arvados.git/sdk/go/ctxlog"
 	"github.com/jmoiron/sqlx"
 
@@ -142,3 +143,33 @@ func CurrentTx(ctx context.Context) (*sqlx.Tx, error) {
 	})
 	return txn.tx, txn.err
 }
+
+var errDBConnection = errors.New("database connection error")
+
+type DBConnector struct {
+	PostgreSQL arvados.PostgreSQL
+	pgdb       *sqlx.DB
+	mtx        sync.Mutex
+}
+
+func (dbc *DBConnector) GetDB(ctx context.Context) (*sqlx.DB, error) {
+	dbc.mtx.Lock()
+	defer dbc.mtx.Unlock()
+	if dbc.pgdb != nil {
+		return dbc.pgdb, nil
+	}
+	db, err := sqlx.Open("postgres", dbc.PostgreSQL.Connection.String())
+	if err != nil {
+		ctxlog.FromContext(ctx).WithError(err).Error("postgresql connect failed")
+		return nil, errDBConnection
+	}
+	if p := dbc.PostgreSQL.ConnectionPool; p > 0 {
+		db.SetMaxOpenConns(p)
+	}
+	if err := db.Ping(); err != nil {
+		ctxlog.FromContext(ctx).WithError(err).Error("postgresql connect succeeded but ping failed")
+		return nil, errDBConnection
+	}
+	dbc.pgdb = db
+	return db, nil
+}
diff --git a/lib/dispatchcloud/dispatcher.go b/lib/dispatchcloud/dispatcher.go
index ae91a710e..3403c50c9 100644
--- a/lib/dispatchcloud/dispatcher.go
+++ b/lib/dispatchcloud/dispatcher.go
@@ -15,6 +15,8 @@ import (
 	"time"
 
 	"git.arvados.org/arvados.git/lib/cloud"
+	"git.arvados.org/arvados.git/lib/controller/dblock"
+	"git.arvados.org/arvados.git/lib/ctrlctx"
 	"git.arvados.org/arvados.git/lib/dispatchcloud/container"
 	"git.arvados.org/arvados.git/lib/dispatchcloud/scheduler"
 	"git.arvados.org/arvados.git/lib/dispatchcloud/sshexecutor"
@@ -53,6 +55,7 @@ type dispatcher struct {
 	Registry      *prometheus.Registry
 	InstanceSetID cloud.InstanceSetID
 
+	dbConnector ctrlctx.DBConnector
 	logger      logrus.FieldLogger
 	instanceSet cloud.InstanceSet
 	pool        pool
@@ -118,6 +121,7 @@ func (disp *dispatcher) setup() {
 
 func (disp *dispatcher) initialize() {
 	disp.logger = ctxlog.FromContext(disp.Context)
+	disp.dbConnector = ctrlctx.DBConnector{PostgreSQL: disp.Cluster.PostgreSQL}
 
 	disp.ArvClient.AuthToken = disp.AuthToken
 
@@ -143,6 +147,7 @@ func (disp *dispatcher) initialize() {
 	if err != nil {
 		disp.logger.Fatalf("error initializing driver: %s", err)
 	}
+	dblock.Dispatch.Lock(disp.Context, disp.dbConnector.GetDB)
 	disp.instanceSet = instanceSet
 	disp.pool = worker.NewPool(disp.logger, disp.ArvClient, disp.Registry, disp.InstanceSetID, disp.instanceSet, disp.newExecutor, disp.sshKey.PublicKey(), disp.Cluster)
 	disp.queue = container.NewQueue(disp.logger, disp.Registry, disp.typeChooser, disp.ArvClient)
@@ -175,6 +180,7 @@ func (disp *dispatcher) initialize() {
 }
 
 func (disp *dispatcher) run() {
+	defer dblock.Dispatch.Unlock()
 	defer close(disp.stopped)
 	defer disp.instanceSet.Stop()
 	defer disp.pool.Stop()
diff --git a/lib/dispatchcloud/dispatcher_test.go b/lib/dispatchcloud/dispatcher_test.go
index 829a05363..bdbdf1ef9 100644
--- a/lib/dispatchcloud/dispatcher_test.go
+++ b/lib/dispatchcloud/dispatcher_test.go
@@ -15,6 +15,7 @@ import (
 	"sync"
 	"time"
 
+	"git.arvados.org/arvados.git/lib/config"
 	"git.arvados.org/arvados.git/lib/dispatchcloud/test"
 	"git.arvados.org/arvados.git/sdk/go/arvados"
 	"git.arvados.org/arvados.git/sdk/go/arvadostest"
@@ -49,8 +50,16 @@ func (s *DispatcherSuite) SetUpTest(c *check.C) {
 		MinTimeBetweenCreateCalls: time.Millisecond,
 	}
 
+	// We need the postgresql connection info from the integration
+	// test config.
+	cfg, err := config.NewLoader(nil, ctxlog.FromContext(s.ctx)).Load()
+	c.Assert(err, check.IsNil)
+	testcluster, err := cfg.GetCluster("")
+	c.Assert(err, check.IsNil)
+
 	s.cluster = &arvados.Cluster{
 		ManagementToken: "test-management-token",
+		PostgreSQL:      testcluster.PostgreSQL,
 		Containers: arvados.ContainersConfig{
 			CrunchRunCommand:       "crunch-run",
 			CrunchRunArgumentsList: []string{"--foo", "--extra='args'"},
diff --git a/lib/lsf/dispatch.go b/lib/lsf/dispatch.go
index d362f66d1..d1408d23c 100644
--- a/lib/lsf/dispatch.go
+++ b/lib/lsf/dispatch.go
@@ -18,6 +18,8 @@ import (
 	"time"
 
 	"git.arvados.org/arvados.git/lib/cmd"
+	"git.arvados.org/arvados.git/lib/controller/dblock"
+	"git.arvados.org/arvados.git/lib/ctrlctx"
 	"git.arvados.org/arvados.git/lib/dispatchcloud"
 	"git.arvados.org/arvados.git/lib/service"
 	"git.arvados.org/arvados.git/sdk/go/arvados"
@@ -58,6 +60,7 @@ type dispatcher struct {
 	Registry  *prometheus.Registry
 
 	logger        logrus.FieldLogger
+	dbConnector   ctrlctx.DBConnector
 	lsfcli        lsfcli
 	lsfqueue      lsfqueue
 	arvDispatcher *dispatch.Dispatcher
@@ -73,7 +76,9 @@ type dispatcher struct {
 func (disp *dispatcher) Start() {
 	disp.initOnce.Do(func() {
 		disp.init()
+		dblock.Dispatch.Lock(context.Background(), disp.dbConnector.GetDB)
 		go func() {
+			defer dblock.Dispatch.Unlock()
 			disp.checkLsfQueueForOrphans()
 			err := disp.arvDispatcher.Run(disp.Context)
 			if err != nil {
@@ -125,6 +130,7 @@ func (disp *dispatcher) init() {
 		lsfcli: &disp.lsfcli,
 	}
 	disp.ArvClient.AuthToken = disp.AuthToken
+	disp.dbConnector = ctrlctx.DBConnector{PostgreSQL: disp.Cluster.PostgreSQL}
 	disp.stop = make(chan struct{}, 1)
 	disp.stopped = make(chan struct{})
 
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index ac394e114..1c0f6ad28 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -19,6 +19,8 @@ import (
 	"time"
 
 	"git.arvados.org/arvados.git/lib/cmd"
+	"git.arvados.org/arvados.git/lib/controller/dblock"
+	"git.arvados.org/arvados.git/lib/ctrlctx"
 	"git.arvados.org/arvados.git/lib/dispatchcloud"
 	"git.arvados.org/arvados.git/lib/service"
 	"git.arvados.org/arvados.git/sdk/go/arvados"
@@ -55,10 +57,11 @@ const initialNiceValue int64 = 10000
 
 type Dispatcher struct {
 	*dispatch.Dispatcher
-	logger  logrus.FieldLogger
-	cluster *arvados.Cluster
-	sqCheck *SqueueChecker
-	slurm   Slurm
+	logger      logrus.FieldLogger
+	cluster     *arvados.Cluster
+	sqCheck     *SqueueChecker
+	slurm       Slurm
+	dbConnector ctrlctx.DBConnector
 
 	done chan struct{}
 	err  error
@@ -90,6 +93,7 @@ func (disp *Dispatcher) configure() error {
 	disp.Client.APIHost = disp.cluster.Services.Controller.ExternalURL.Host
 	disp.Client.AuthToken = disp.cluster.SystemRootToken
 	disp.Client.Insecure = disp.cluster.TLS.Insecure
+	disp.dbConnector = ctrlctx.DBConnector{PostgreSQL: disp.cluster.PostgreSQL}
 
 	if disp.Client.APIHost != "" || disp.Client.AuthToken != "" {
 		// Copy real configs into env vars so [a]
@@ -137,6 +141,8 @@ func (disp *Dispatcher) setup() {
 }
 
 func (disp *Dispatcher) run() error {
+	dblock.Dispatch.Lock(context.Background(), disp.dbConnector.GetDB)
+	defer dblock.Dispatch.Unlock()
 	defer disp.sqCheck.Stop()
 
 	if disp.cluster != nil && len(disp.cluster.InstanceTypes) > 0 {

commit e33a15001d7a94a805a5d0d4c77544d959974193
Author: Tom Clegg <tom at curii.com>
Date:   Thu Oct 27 11:05:42 2022 -0400

    18071: Log host:port of other process(es) holding lock.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/lib/controller/dblock/dblock.go b/lib/controller/dblock/dblock.go
index 472633747..6f5db1006 100644
--- a/lib/controller/dblock/dblock.go
+++ b/lib/controller/dblock/dblock.go
@@ -7,6 +7,8 @@ package dblock
 import (
 	"context"
 	"database/sql"
+	"fmt"
+	"net"
 	"sync"
 	"time"
 
@@ -36,7 +38,8 @@ type DBLocker struct {
 //
 // Returns false if ctx is canceled before the lock is acquired.
 func (dbl *DBLocker) Lock(ctx context.Context, getdb func(context.Context) (*sqlx.DB, error)) bool {
-	logger := ctxlog.FromContext(ctx)
+	logger := ctxlog.FromContext(ctx).WithField("ID", dbl.key)
+	var lastHeldBy string
 	for ; ; time.Sleep(retryDelay) {
 		dbl.mtx.Lock()
 		if dbl.conn != nil {
@@ -54,7 +57,7 @@ func (dbl *DBLocker) Lock(ctx context.Context, getdb func(context.Context) (*sql
 			dbl.mtx.Unlock()
 			return false
 		} else if err != nil {
-			logger.WithError(err).Infof("error getting database pool")
+			logger.WithError(err).Info("error getting database pool")
 			dbl.mtx.Unlock()
 			continue
 		}
@@ -72,17 +75,31 @@ func (dbl *DBLocker) Lock(ctx context.Context, getdb func(context.Context) (*sql
 		if err == context.Canceled {
 			return false
 		} else if err != nil {
-			logger.WithError(err).Infof("error getting pg_try_advisory_lock %d", dbl.key)
+			logger.WithError(err).Info("error getting pg_try_advisory_lock")
 			conn.Close()
 			dbl.mtx.Unlock()
 			continue
 		}
 		if !locked {
+			var host string
+			var port int
+			err = conn.QueryRowContext(ctx, `SELECT client_addr, client_port FROM pg_stat_activity WHERE pid IN
+				(SELECT pid FROM pg_locks
+				 WHERE locktype = $1 AND objid = $2)`, "advisory", dbl.key).Scan(&host, &port)
+			if err != nil {
+				logger.WithError(err).Info("error getting other client info")
+			} else {
+				heldBy := net.JoinHostPort(host, fmt.Sprintf("%d", port))
+				if lastHeldBy != heldBy {
+					logger.WithField("DBClient", heldBy).Info("waiting for other process to release lock")
+					lastHeldBy = heldBy
+				}
+			}
 			conn.Close()
 			dbl.mtx.Unlock()
 			continue
 		}
-		logger.Debugf("acquired pg_advisory_lock %d", dbl.key)
+		logger.Debug("acquired pg_advisory_lock")
 		dbl.ctx, dbl.getdb, dbl.conn = ctx, getdb, conn
 		dbl.mtx.Unlock()
 		return true
@@ -102,7 +119,7 @@ func (dbl *DBLocker) Check() bool {
 		dbl.mtx.Unlock()
 		return false
 	} else if err == nil {
-		ctxlog.FromContext(dbl.ctx).Debugf("pg_advisory_lock %d connection still alive", dbl.key)
+		ctxlog.FromContext(dbl.ctx).WithField("ID", dbl.key).Debug("connection still alive")
 		dbl.mtx.Unlock()
 		return true
 	}
@@ -120,9 +137,9 @@ func (dbl *DBLocker) Unlock() {
 	if dbl.conn != nil {
 		_, err := dbl.conn.ExecContext(context.Background(), `SELECT pg_advisory_unlock($1)`, dbl.key)
 		if err != nil {
-			ctxlog.FromContext(dbl.ctx).WithError(err).Infof("error releasing pg_advisory_lock %d", dbl.key)
+			ctxlog.FromContext(dbl.ctx).WithError(err).WithField("ID", dbl.key).Info("error releasing pg_advisory_lock")
 		} else {
-			ctxlog.FromContext(dbl.ctx).Debugf("released pg_advisory_lock %d", dbl.key)
+			ctxlog.FromContext(dbl.ctx).WithField("ID", dbl.key).Debug("released pg_advisory_lock")
 		}
 		dbl.conn.Close()
 		dbl.conn = nil
diff --git a/lib/controller/dblock/dblock_test.go b/lib/controller/dblock/dblock_test.go
new file mode 100644
index 000000000..6079df6f8
--- /dev/null
+++ b/lib/controller/dblock/dblock_test.go
@@ -0,0 +1,91 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dblock
+
+import (
+	"bytes"
+	"context"
+	"sync"
+	"testing"
+	"time"
+
+	"git.arvados.org/arvados.git/lib/config"
+	"git.arvados.org/arvados.git/sdk/go/arvados"
+	"git.arvados.org/arvados.git/sdk/go/arvadostest"
+	"git.arvados.org/arvados.git/sdk/go/ctxlog"
+	"github.com/jmoiron/sqlx"
+	"github.com/sirupsen/logrus"
+	check "gopkg.in/check.v1"
+)
+
+func Test(t *testing.T) {
+	check.TestingT(t)
+}
+
+var _ = check.Suite(&suite{})
+
+type suite struct {
+	cluster *arvados.Cluster
+	db      *sqlx.DB
+	getdb   func(context.Context) (*sqlx.DB, error)
+}
+
+var testLocker = &DBLocker{key: 999}
+
+func (s *suite) SetUpSuite(c *check.C) {
+	cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
+	c.Assert(err, check.IsNil)
+	s.cluster, err = cfg.GetCluster("")
+	c.Assert(err, check.IsNil)
+	s.db = arvadostest.DB(c, s.cluster)
+	s.getdb = func(context.Context) (*sqlx.DB, error) { return s.db, nil }
+}
+
+func (s *suite) TestLock(c *check.C) {
+	retryDelay = time.Millisecond
+
+	var logbuf bytes.Buffer
+	logger := ctxlog.New(&logbuf, "text", "debug")
+	logger.Level = logrus.DebugLevel
+	ctx := ctxlog.Context(context.Background(), logger)
+	ctx, cancel := context.WithCancel(ctx)
+	defer cancel()
+	testLocker.Lock(ctx, s.getdb)
+	testLocker.Check()
+
+	lock2 := make(chan bool)
+	var wg sync.WaitGroup
+	defer wg.Wait()
+	wg.Add(1)
+	go func() {
+		defer wg.Done()
+		testLocker2 := &DBLocker{key: 999}
+		testLocker2.Lock(ctx, s.getdb)
+		close(lock2)
+		testLocker2.Check()
+		testLocker2.Unlock()
+	}()
+
+	// Second lock should wait for first to Unlock
+	select {
+	case <-time.After(time.Second / 10):
+		c.Check(logbuf.String(), check.Matches, `(?ms).*level=info.*DBClient="[^"]+:\d+".*ID=999.*`)
+	case <-lock2:
+		c.Log("double-lock")
+		c.Fail()
+	}
+
+	testLocker.Check()
+	testLocker.Unlock()
+
+	// Now the second lock should succeed within retryDelay
+	select {
+	case <-time.After(retryDelay * 2):
+		c.Log("timed out")
+		c.Fail()
+	case <-lock2:
+	}
+	c.Logf("%s", logbuf.String())
+}

commit aa9507e1633819259794ea4d6cf391dc88621dac
Author: Tom Clegg <tom at curii.com>
Date:   Wed Oct 26 17:00:42 2022 -0400

    18071: Use dblock to avoid concurrent keep-balance ops.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/lib/controller/dblock/dblock.go b/lib/controller/dblock/dblock.go
index a46201bb4..472633747 100644
--- a/lib/controller/dblock/dblock.go
+++ b/lib/controller/dblock/dblock.go
@@ -15,9 +15,11 @@ import (
 )
 
 var (
-	TrashSweep        = &DBLocker{key: 10001}
-	ContainerLogSweep = &DBLocker{key: 10002}
-	retryDelay        = 5 * time.Second
+	TrashSweep         = &DBLocker{key: 10001}
+	ContainerLogSweep  = &DBLocker{key: 10002}
+	KeepBalanceService = &DBLocker{key: 10003} // keep-balance service in periodic-sweep loop
+	KeepBalanceActive  = &DBLocker{key: 10004} // keep-balance sweep in progress (either -once=true or service loop)
+	retryDelay         = 5 * time.Second
 )
 
 // DBLocker uses pg_advisory_lock to maintain a cluster-wide lock for
@@ -31,7 +33,9 @@ type DBLocker struct {
 }
 
 // Lock acquires the advisory lock, waiting/reconnecting if needed.
-func (dbl *DBLocker) Lock(ctx context.Context, getdb func(context.Context) (*sqlx.DB, error)) {
+//
+// Returns false if ctx is canceled before the lock is acquired.
+func (dbl *DBLocker) Lock(ctx context.Context, getdb func(context.Context) (*sqlx.DB, error)) bool {
 	logger := ctxlog.FromContext(ctx)
 	for ; ; time.Sleep(retryDelay) {
 		dbl.mtx.Lock()
@@ -41,21 +45,33 @@ func (dbl *DBLocker) Lock(ctx context.Context, getdb func(context.Context) (*sql
 			dbl.mtx.Unlock()
 			continue
 		}
+		if ctx.Err() != nil {
+			dbl.mtx.Unlock()
+			return false
+		}
 		db, err := getdb(ctx)
-		if err != nil {
+		if err == context.Canceled {
+			dbl.mtx.Unlock()
+			return false
+		} else if err != nil {
 			logger.WithError(err).Infof("error getting database pool")
 			dbl.mtx.Unlock()
 			continue
 		}
 		conn, err := db.Conn(ctx)
-		if err != nil {
+		if err == context.Canceled {
+			dbl.mtx.Unlock()
+			return false
+		} else if err != nil {
 			logger.WithError(err).Info("error getting database connection")
 			dbl.mtx.Unlock()
 			continue
 		}
 		var locked bool
 		err = conn.QueryRowContext(ctx, `SELECT pg_try_advisory_lock($1)`, dbl.key).Scan(&locked)
-		if err != nil {
+		if err == context.Canceled {
+			return false
+		} else if err != nil {
 			logger.WithError(err).Infof("error getting pg_try_advisory_lock %d", dbl.key)
 			conn.Close()
 			dbl.mtx.Unlock()
@@ -69,27 +85,33 @@ func (dbl *DBLocker) Lock(ctx context.Context, getdb func(context.Context) (*sql
 		logger.Debugf("acquired pg_advisory_lock %d", dbl.key)
 		dbl.ctx, dbl.getdb, dbl.conn = ctx, getdb, conn
 		dbl.mtx.Unlock()
-		return
+		return true
 	}
 }
 
 // Check confirms that the lock is still active (i.e., the session is
 // still alive), and re-acquires if needed. Panics if Lock is not
 // acquired first.
-func (dbl *DBLocker) Check() {
+//
+// Returns false if the context passed to Lock() is canceled before
+// the lock is confirmed or reacquired.
+func (dbl *DBLocker) Check() bool {
 	dbl.mtx.Lock()
 	err := dbl.conn.PingContext(dbl.ctx)
-	if err == nil {
+	if err == context.Canceled {
+		dbl.mtx.Unlock()
+		return false
+	} else if err == nil {
 		ctxlog.FromContext(dbl.ctx).Debugf("pg_advisory_lock %d connection still alive", dbl.key)
 		dbl.mtx.Unlock()
-		return
+		return true
 	}
 	ctxlog.FromContext(dbl.ctx).WithError(err).Info("database connection ping failed")
 	dbl.conn.Close()
 	dbl.conn = nil
 	ctx, getdb := dbl.ctx, dbl.getdb
 	dbl.mtx.Unlock()
-	dbl.Lock(ctx, getdb)
+	return dbl.Lock(ctx, getdb)
 }
 
 func (dbl *DBLocker) Unlock() {
diff --git a/lib/controller/trash.go b/lib/controller/trash.go
index 9a7b0814c..afdf95b78 100644
--- a/lib/controller/trash.go
+++ b/lib/controller/trash.go
@@ -20,10 +20,16 @@ func (h *Handler) periodicWorker(workerName string, interval time.Duration, lock
 		logger.Debugf("interval is %v, not running worker", interval)
 		return
 	}
-	locker.Lock(ctx, h.db)
+	if !locker.Lock(ctx, h.db) {
+		// context canceled
+		return
+	}
 	defer locker.Unlock()
 	for time.Sleep(interval); ctx.Err() == nil; time.Sleep(interval) {
-		locker.Check()
+		if !locker.Check() {
+			// context canceled
+			return
+		}
 		err := run(ctx)
 		if err != nil {
 			logger.WithError(err).Infof("%s failed", workerName)
diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go
index 1dedb409a..50c4dae18 100644
--- a/services/keep-balance/balance.go
+++ b/services/keep-balance/balance.go
@@ -23,7 +23,9 @@ import (
 	"syscall"
 	"time"
 
+	"git.arvados.org/arvados.git/lib/controller/dblock"
 	"git.arvados.org/arvados.git/sdk/go/arvados"
+	"git.arvados.org/arvados.git/sdk/go/ctxlog"
 	"git.arvados.org/arvados.git/sdk/go/keepclient"
 	"github.com/jmoiron/sqlx"
 	"github.com/sirupsen/logrus"
@@ -70,13 +72,20 @@ type Balancer struct {
 //
 // Typical usage:
 //
-//   runOptions, err = (&Balancer{}).Run(config, runOptions)
-func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
+//	runOptions, err = (&Balancer{}).Run(config, runOptions)
+func (bal *Balancer) Run(ctx context.Context, client *arvados.Client, cluster *arvados.Cluster, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
 	nextRunOptions = runOptions
 
+	ctxlog.FromContext(ctx).Info("acquiring active lock")
+	if !dblock.KeepBalanceActive.Lock(ctx, func(context.Context) (*sqlx.DB, error) { return bal.DB, nil }) {
+		// context canceled
+		return
+	}
+	defer dblock.KeepBalanceActive.Unlock()
+
 	defer bal.time("sweep", "wall clock time to run one full sweep")()
 
-	ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(cluster.Collections.BalanceTimeout.Duration()))
+	ctx, cancel := context.WithDeadline(ctx, time.Now().Add(cluster.Collections.BalanceTimeout.Duration()))
 	defer cancel()
 
 	var lbFile *os.File
diff --git a/services/keep-balance/balance_run_test.go b/services/keep-balance/balance_run_test.go
index 2db7bea17..4772da55a 100644
--- a/services/keep-balance/balance_run_test.go
+++ b/services/keep-balance/balance_run_test.go
@@ -6,6 +6,7 @@ package keepbalance
 
 import (
 	"bytes"
+	"context"
 	"encoding/json"
 	"fmt"
 	"io"
@@ -372,7 +373,7 @@ func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
 	trashReqs := s.stub.serveKeepstoreTrash()
 	pullReqs := s.stub.serveKeepstorePull()
 	srv := s.newServer(&opts)
-	_, err = srv.runOnce()
+	_, err = srv.runOnce(context.Background())
 	c.Check(err, check.ErrorMatches, "received zero collections")
 	c.Check(trashReqs.Count(), check.Equals, 4)
 	c.Check(pullReqs.Count(), check.Equals, 0)
@@ -391,7 +392,7 @@ func (s *runSuite) TestRefuseNonAdmin(c *check.C) {
 	trashReqs := s.stub.serveKeepstoreTrash()
 	pullReqs := s.stub.serveKeepstorePull()
 	srv := s.newServer(&opts)
-	_, err := srv.runOnce()
+	_, err := srv.runOnce(context.Background())
 	c.Check(err, check.ErrorMatches, "current user .* is not .* admin user")
 	c.Check(trashReqs.Count(), check.Equals, 0)
 	c.Check(pullReqs.Count(), check.Equals, 0)
@@ -417,7 +418,7 @@ func (s *runSuite) TestRefuseSameDeviceDifferentVolumes(c *check.C) {
 	trashReqs := s.stub.serveKeepstoreTrash()
 	pullReqs := s.stub.serveKeepstorePull()
 	srv := s.newServer(&opts)
-	_, err := srv.runOnce()
+	_, err := srv.runOnce(context.Background())
 	c.Check(err, check.ErrorMatches, "cannot continue with config errors.*")
 	c.Check(trashReqs.Count(), check.Equals, 0)
 	c.Check(pullReqs.Count(), check.Equals, 0)
@@ -442,7 +443,7 @@ func (s *runSuite) TestWriteLostBlocks(c *check.C) {
 	s.stub.serveKeepstorePull()
 	srv := s.newServer(&opts)
 	c.Assert(err, check.IsNil)
-	_, err = srv.runOnce()
+	_, err = srv.runOnce(context.Background())
 	c.Check(err, check.IsNil)
 	lost, err := ioutil.ReadFile(lostf.Name())
 	c.Assert(err, check.IsNil)
@@ -463,7 +464,7 @@ func (s *runSuite) TestDryRun(c *check.C) {
 	trashReqs := s.stub.serveKeepstoreTrash()
 	pullReqs := s.stub.serveKeepstorePull()
 	srv := s.newServer(&opts)
-	bal, err := srv.runOnce()
+	bal, err := srv.runOnce(context.Background())
 	c.Check(err, check.IsNil)
 	for _, req := range collReqs.reqs {
 		c.Check(req.Form.Get("include_trash"), check.Equals, "true")
@@ -493,7 +494,7 @@ func (s *runSuite) TestCommit(c *check.C) {
 	trashReqs := s.stub.serveKeepstoreTrash()
 	pullReqs := s.stub.serveKeepstorePull()
 	srv := s.newServer(&opts)
-	bal, err := srv.runOnce()
+	bal, err := srv.runOnce(context.Background())
 	c.Check(err, check.IsNil)
 	c.Check(trashReqs.Count(), check.Equals, 8)
 	c.Check(pullReqs.Count(), check.Equals, 4)
@@ -533,13 +534,14 @@ func (s *runSuite) TestRunForever(c *check.C) {
 	trashReqs := s.stub.serveKeepstoreTrash()
 	pullReqs := s.stub.serveKeepstorePull()
 
-	stop := make(chan interface{})
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
 	s.config.Collections.BalancePeriod = arvados.Duration(time.Millisecond)
 	srv := s.newServer(&opts)
 
 	done := make(chan bool)
 	go func() {
-		srv.runForever(stop)
+		srv.runForever(ctx)
 		close(done)
 	}()
 
@@ -550,7 +552,7 @@ func (s *runSuite) TestRunForever(c *check.C) {
 	for t0 := time.Now(); pullReqs.Count() < 16 && time.Since(t0) < 10*time.Second; {
 		time.Sleep(time.Millisecond)
 	}
-	stop <- true
+	cancel()
 	<-done
 	c.Check(pullReqs.Count() >= 16, check.Equals, true)
 	c.Check(trashReqs.Count(), check.Equals, pullReqs.Count()+4)
diff --git a/services/keep-balance/integration_test.go b/services/keep-balance/integration_test.go
index 3cfb5cded..42463a002 100644
--- a/services/keep-balance/integration_test.go
+++ b/services/keep-balance/integration_test.go
@@ -6,6 +6,7 @@ package keepbalance
 
 import (
 	"bytes"
+	"context"
 	"io"
 	"os"
 	"strings"
@@ -97,7 +98,7 @@ func (s *integrationSuite) TestBalanceAPIFixtures(c *check.C) {
 			Logger:  logger,
 			Metrics: newMetrics(prometheus.NewRegistry()),
 		}
-		nextOpts, err := bal.Run(s.client, s.config, opts)
+		nextOpts, err := bal.Run(context.Background(), s.client, s.config, opts)
 		c.Check(err, check.IsNil)
 		c.Check(nextOpts.SafeRendezvousState, check.Not(check.Equals), "")
 		c.Check(nextOpts.CommitPulls, check.Equals, true)
diff --git a/services/keep-balance/main.go b/services/keep-balance/main.go
index f0b0df5bd..b016db22f 100644
--- a/services/keep-balance/main.go
+++ b/services/keep-balance/main.go
@@ -112,7 +112,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
 				Routes: health.Routes{"ping": srv.CheckHealth},
 			}
 
-			go srv.run()
+			go srv.run(ctx)
 			return srv
 		}).RunCommand(prog, args, stdin, stdout, stderr)
 }
diff --git a/services/keep-balance/server.go b/services/keep-balance/server.go
index e485f5b20..fd53497f7 100644
--- a/services/keep-balance/server.go
+++ b/services/keep-balance/server.go
@@ -5,12 +5,14 @@
 package keepbalance
 
 import (
+	"context"
 	"net/http"
 	"os"
 	"os/signal"
 	"syscall"
 	"time"
 
+	"git.arvados.org/arvados.git/lib/controller/dblock"
 	"git.arvados.org/arvados.git/sdk/go/arvados"
 	"github.com/jmoiron/sqlx"
 	"github.com/sirupsen/logrus"
@@ -62,12 +64,12 @@ func (srv *Server) Done() <-chan struct{} {
 	return nil
 }
 
-func (srv *Server) run() {
+func (srv *Server) run(ctx context.Context) {
 	var err error
 	if srv.RunOptions.Once {
-		_, err = srv.runOnce()
+		_, err = srv.runOnce(ctx)
 	} else {
-		err = srv.runForever(nil)
+		err = srv.runForever(ctx)
 	}
 	if err != nil {
 		srv.Logger.Error(err)
@@ -77,7 +79,7 @@ func (srv *Server) run() {
 	}
 }
 
-func (srv *Server) runOnce() (*Balancer, error) {
+func (srv *Server) runOnce(ctx context.Context) (*Balancer, error) {
 	bal := &Balancer{
 		DB:             srv.DB,
 		Logger:         srv.Logger,
@@ -86,13 +88,12 @@ func (srv *Server) runOnce() (*Balancer, error) {
 		LostBlocksFile: srv.Cluster.Collections.BlobMissingReport,
 	}
 	var err error
-	srv.RunOptions, err = bal.Run(srv.ArvClient, srv.Cluster, srv.RunOptions)
+	srv.RunOptions, err = bal.Run(ctx, srv.ArvClient, srv.Cluster, srv.RunOptions)
 	return bal, err
 }
 
-// RunForever runs forever, or (for testing purposes) until the given
-// stop channel is ready to receive.
-func (srv *Server) runForever(stop <-chan interface{}) error {
+// RunForever runs forever, or until ctx is cancelled.
+func (srv *Server) runForever(ctx context.Context) error {
 	logger := srv.Logger
 
 	ticker := time.NewTicker(time.Duration(srv.Cluster.Collections.BalancePeriod))
@@ -102,6 +103,10 @@ func (srv *Server) runForever(stop <-chan interface{}) error {
 	sigUSR1 := make(chan os.Signal)
 	signal.Notify(sigUSR1, syscall.SIGUSR1)
 
+	logger.Info("acquiring service lock")
+	dblock.KeepBalanceService.Lock(ctx, func(context.Context) (*sqlx.DB, error) { return srv.DB, nil })
+	defer dblock.KeepBalanceService.Unlock()
+
 	logger.Printf("starting up: will scan every %v and on SIGUSR1", srv.Cluster.Collections.BalancePeriod)
 
 	for {
@@ -110,7 +115,11 @@ func (srv *Server) runForever(stop <-chan interface{}) error {
 			logger.Print("=======  Consider using -commit-pulls and -commit-trash flags.")
 		}
 
-		_, err := srv.runOnce()
+		if !dblock.KeepBalanceService.Check() {
+			// context canceled
+			return nil
+		}
+		_, err := srv.runOnce(ctx)
 		if err != nil {
 			logger.Print("run failed: ", err)
 		} else {
@@ -118,7 +127,7 @@ func (srv *Server) runForever(stop <-chan interface{}) error {
 		}
 
 		select {
-		case <-stop:
+		case <-ctx.Done():
 			signal.Stop(sigUSR1)
 			return nil
 		case <-ticker.C:

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list