[ARVADOS] created: 2.1.0-1610-gf23f5689e

Git user git at public.arvados.org
Tue Nov 16 16:08:37 UTC 2021


        at  f23f5689eac6354eb9567c91f2ff8586e2118e92 (commit)


commit f23f5689eac6354eb9567c91f2ff8586e2118e92
Author: Tom Clegg <tom at curii.com>
Date:   Tue Nov 16 11:06:23 2021 -0500

    18339: Extract dblocker to a package.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/lib/controller/dblock/dblock.go b/lib/controller/dblock/dblock.go
new file mode 100644
index 000000000..b0d348870
--- /dev/null
+++ b/lib/controller/dblock/dblock.go
@@ -0,0 +1,101 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dblock
+
+import (
+	"context"
+	"database/sql"
+	"sync"
+	"time"
+
+	"git.arvados.org/arvados.git/sdk/go/ctxlog"
+	"github.com/jmoiron/sqlx"
+)
+
+var (
+	TrashSweep = &DBLocker{key: 10001}
+	retryDelay = 5 * time.Second
+)
+
+// DBLocker uses pg_advisory_lock to maintain a cluster-wide lock for
+// a long-running task like "do X every N seconds".
+type DBLocker struct {
+	key   int
+	mtx   sync.Mutex
+	ctx   context.Context
+	getdb func(context.Context) (*sqlx.DB, error)
+	conn  *sql.Conn // != nil if advisory lock has been acquired
+}
+
+// Lock acquires the advisory lock, waiting/reconnecting if needed.
+func (dbl *DBLocker) Lock(ctx context.Context, getdb func(context.Context) (*sqlx.DB, error)) {
+	logger := ctxlog.FromContext(ctx)
+	for ; ; time.Sleep(retryDelay) {
+		dbl.mtx.Lock()
+		if dbl.conn != nil {
+			// Already locked by another caller in this
+			// process. Wait for them to release.
+			dbl.mtx.Unlock()
+			continue
+		}
+		db, err := getdb(ctx)
+		if err != nil {
+			logger.WithError(err).Infof("error getting database pool")
+			dbl.mtx.Unlock()
+			continue
+		}
+		conn, err := db.Conn(ctx)
+		if err != nil {
+			logger.WithError(err).Info("error getting database connection")
+			dbl.mtx.Unlock()
+			continue
+		}
+		_, err = conn.ExecContext(ctx, `SELECT pg_advisory_lock($1)`, dbl.key)
+		if err != nil {
+			logger.WithError(err).Infof("error getting pg_advisory_lock %d", dbl.key)
+			conn.Close()
+			dbl.mtx.Unlock()
+			continue
+		}
+		logger.Debugf("acquired pg_advisory_lock %d", dbl.key)
+		dbl.ctx, dbl.getdb, dbl.conn = ctx, getdb, conn
+		dbl.mtx.Unlock()
+		return
+	}
+}
+
+// Check confirms that the lock is still active (i.e., the session is
+// still alive), and re-acquires if needed. Panics if Lock is not
+// acquired first.
+func (dbl *DBLocker) Check() {
+	dbl.mtx.Lock()
+	err := dbl.conn.PingContext(dbl.ctx)
+	if err == nil {
+		ctxlog.FromContext(dbl.ctx).Debugf("pg_advisory_lock %d connection still alive", dbl.key)
+		dbl.mtx.Unlock()
+		return
+	}
+	ctxlog.FromContext(dbl.ctx).WithError(err).Info("database connection ping failed")
+	dbl.conn.Close()
+	dbl.conn = nil
+	ctx, getdb := dbl.ctx, dbl.getdb
+	dbl.mtx.Unlock()
+	dbl.Lock(ctx, getdb)
+}
+
+func (dbl *DBLocker) Unlock() {
+	dbl.mtx.Lock()
+	defer dbl.mtx.Unlock()
+	if dbl.conn != nil {
+		_, err := dbl.conn.ExecContext(context.Background(), `SELECT pg_advisory_unlock($1)`, dbl.key)
+		if err != nil {
+			ctxlog.FromContext(dbl.ctx).WithError(err).Infof("error releasing pg_advisory_lock %d", dbl.key)
+		} else {
+			ctxlog.FromContext(dbl.ctx).Debugf("released pg_advisory_lock %d", dbl.key)
+		}
+		dbl.conn.Close()
+		dbl.conn = nil
+	}
+}
diff --git a/lib/controller/trash.go b/lib/controller/trash.go
new file mode 100644
index 000000000..551b2f92b
--- /dev/null
+++ b/lib/controller/trash.go
@@ -0,0 +1,33 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package controller
+
+import (
+	"time"
+
+	"git.arvados.org/arvados.git/lib/controller/dblock"
+	"git.arvados.org/arvados.git/sdk/go/auth"
+	"git.arvados.org/arvados.git/sdk/go/ctxlog"
+)
+
+func (h *Handler) trashSweepWorker() {
+	sleep := h.Cluster.Collections.TrashSweepInterval.Duration()
+	logger := ctxlog.FromContext(h.BackgroundContext).WithField("worker", "trash sweep")
+	ctx := ctxlog.Context(h.BackgroundContext, logger)
+	if sleep <= 0 {
+		logger.Debugf("Collections.TrashSweepInterval is %v, not running worker", sleep)
+		return
+	}
+	dblock.TrashSweep.Lock(ctx, h.db)
+	defer dblock.TrashSweep.Unlock()
+	for time.Sleep(sleep); ctx.Err() == nil; time.Sleep(sleep) {
+		dblock.TrashSweep.Check()
+		ctx := auth.NewContext(ctx, &auth.Credentials{Tokens: []string{h.Cluster.SystemRootToken}})
+		_, err := h.federation.SysTrashSweep(ctx, struct{}{})
+		if err != nil {
+			logger.WithError(err).Info("trash sweep failed")
+		}
+	}
+}
diff --git a/lib/controller/worker.go b/lib/controller/worker.go
deleted file mode 100644
index 02f3db330..000000000
--- a/lib/controller/worker.go
+++ /dev/null
@@ -1,95 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package controller
-
-import (
-	"context"
-	"database/sql"
-	"time"
-
-	"git.arvados.org/arvados.git/sdk/go/auth"
-	"git.arvados.org/arvados.git/sdk/go/ctxlog"
-	"github.com/jmoiron/sqlx"
-)
-
-const (
-	// lock keys should be added here with explicit values, to
-	// ensure they do not get accidentally renumbered when a key
-	// is added or removed.
-	lockKeyTrashSweep = 10001
-)
-
-// dbLocker uses pg_advisory_lock to maintain a cluster-wide lock for
-// a long-running task like "do X every N seconds".
-type dbLocker struct {
-	GetDB   func(context.Context) (*sqlx.DB, error)
-	LockKey int
-
-	conn *sql.Conn // != nil if advisory lock is acquired
-}
-
-// Lock acquires the advisory lock the first time it is
-// called. Subsequent calls confirm that the lock is still active
-// (i.e., the session is still alive), and re-acquires if needed.
-func (dbl *dbLocker) Lock(ctx context.Context) {
-	logger := ctxlog.FromContext(ctx)
-	for ; ; time.Sleep(5 * time.Second) {
-		if dbl.conn == nil {
-			db, err := dbl.GetDB(ctx)
-			if err != nil {
-				logger.WithError(err).Infof("error getting database pool")
-				continue
-			}
-			conn, err := db.Conn(ctx)
-			if err != nil {
-				logger.WithError(err).Info("error getting database connection")
-				continue
-			}
-			_, err = conn.ExecContext(ctx, `SELECT pg_advisory_lock($1)`, dbl.LockKey)
-			if err != nil {
-				logger.WithError(err).Info("error getting lock")
-				conn.Close()
-				continue
-			}
-			dbl.conn = conn
-		}
-		err := dbl.conn.PingContext(ctx)
-		if err != nil {
-			logger.WithError(err).Info("database connection ping failed")
-			dbl.conn.Close()
-			dbl.conn = nil
-			continue
-		}
-		return
-	}
-}
-
-func (dbl *dbLocker) Unlock() {
-	if dbl.conn != nil {
-		dbl.conn.Close()
-		dbl.conn = nil
-	}
-}
-
-func (h *Handler) trashSweepWorker() {
-	sleep := h.Cluster.Collections.TrashSweepInterval.Duration()
-	logger := ctxlog.FromContext(h.BackgroundContext).WithField("worker", "trash sweep")
-	ctx := ctxlog.Context(h.BackgroundContext, logger)
-	if sleep <= 0 {
-		logger.Debugf("Collections.TrashSweepInterval is %v, not running worker", sleep)
-		return
-	}
-	locker := &dbLocker{GetDB: h.db, LockKey: lockKeyTrashSweep}
-	locker.Lock(ctx)
-	defer locker.Unlock()
-	for time.Sleep(sleep); ctx.Err() == nil; time.Sleep(sleep) {
-		locker.Lock(ctx)
-		ctx := auth.NewContext(ctx, &auth.Credentials{Tokens: []string{h.Cluster.SystemRootToken}})
-		_, err := h.federation.SysTrashSweep(ctx, struct{}{})
-		if err != nil {
-			logger.WithError(err).Info("trash sweep failed")
-		}
-	}
-}

commit fdf081b663b91c1d0af669e0224e67a47b8497a3
Author: Tom Clegg <tom at curii.com>
Date:   Mon Nov 15 15:21:56 2021 -0500

    18339: Call trash_sweep periodically from controller.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/lib/controller/auth_test.go b/lib/controller/auth_test.go
index 175241146..5d477a766 100644
--- a/lib/controller/auth_test.go
+++ b/lib/controller/auth_test.go
@@ -98,7 +98,7 @@ func (s *AuthSuite) SetUpTest(c *check.C) {
 	cluster.Login.OpenIDConnect.AcceptAccessToken = true
 	cluster.Login.OpenIDConnect.AcceptAccessTokenScope = ""
 
-	s.testHandler = &Handler{Cluster: cluster}
+	s.testHandler = &Handler{Cluster: cluster, BackgroundContext: ctxlog.Context(context.Background(), s.log)}
 	s.testServer = newServerFromIntegrationTestEnv(c)
 	s.testServer.Server.BaseContext = func(net.Listener) context.Context {
 		return ctxlog.Context(context.Background(), s.log)
diff --git a/lib/controller/cmd.go b/lib/controller/cmd.go
index 7ab7f5305..96972251a 100644
--- a/lib/controller/cmd.go
+++ b/lib/controller/cmd.go
@@ -16,6 +16,6 @@ import (
 // Command starts a controller service. See cmd/arvados-server/cmd.go
 var Command cmd.Handler = service.Command(arvados.ServiceNameController, newHandler)
 
-func newHandler(_ context.Context, cluster *arvados.Cluster, _ string, _ *prometheus.Registry) service.Handler {
-	return &Handler{Cluster: cluster}
+func newHandler(ctx context.Context, cluster *arvados.Cluster, _ string, _ *prometheus.Registry) service.Handler {
+	return &Handler{Cluster: cluster, BackgroundContext: ctx}
 }
diff --git a/lib/controller/federation/conn.go b/lib/controller/federation/conn.go
index d1bf473d7..d4155da10 100644
--- a/lib/controller/federation/conn.go
+++ b/lib/controller/federation/conn.go
@@ -525,6 +525,10 @@ func (conn *Conn) SpecimenDelete(ctx context.Context, options arvados.DeleteOpti
 	return conn.chooseBackend(options.UUID).SpecimenDelete(ctx, options)
 }
 
+func (conn *Conn) SysTrashSweep(ctx context.Context, options struct{}) (struct{}, error) {
+	return conn.local.SysTrashSweep(ctx, options)
+}
+
 var userAttrsCachedFromLoginCluster = map[string]bool{
 	"created_at":  true,
 	"email":       true,
diff --git a/lib/controller/federation_test.go b/lib/controller/federation_test.go
index 211c76198..eb398695b 100644
--- a/lib/controller/federation_test.go
+++ b/lib/controller/federation_test.go
@@ -70,7 +70,7 @@ func (s *FederationSuite) SetUpTest(c *check.C) {
 	cluster.Collections.BlobSigningTTL = arvados.Duration(time.Hour * 24 * 14)
 	arvadostest.SetServiceURL(&cluster.Services.RailsAPI, "http://localhost:1/")
 	arvadostest.SetServiceURL(&cluster.Services.Controller, "http://localhost:/")
-	s.testHandler = &Handler{Cluster: cluster}
+	s.testHandler = &Handler{Cluster: cluster, BackgroundContext: ctxlog.Context(context.Background(), s.log)}
 	s.testServer = newServerFromIntegrationTestEnv(c)
 	s.testServer.Server.BaseContext = func(net.Listener) context.Context {
 		return ctxlog.Context(context.Background(), s.log)
diff --git a/lib/controller/handler.go b/lib/controller/handler.go
index b51d90911..965ba040e 100644
--- a/lib/controller/handler.go
+++ b/lib/controller/handler.go
@@ -32,9 +32,11 @@ import (
 )
 
 type Handler struct {
-	Cluster *arvados.Cluster
+	Cluster           *arvados.Cluster
+	BackgroundContext context.Context
 
 	setupOnce      sync.Once
+	federation     *federation.Conn
 	handlerStack   http.Handler
 	proxy          *proxy
 	secureClient   *http.Client
@@ -103,7 +105,8 @@ func (h *Handler) setup() {
 	healthFuncs := make(map[string]health.Func)
 
 	oidcAuthorizer := localdb.OIDCAccessTokenAuthorizer(h.Cluster, h.db)
-	rtr := router.New(federation.New(h.Cluster, &healthFuncs), router.Config{
+	h.federation = federation.New(h.Cluster, &healthFuncs)
+	rtr := router.New(h.federation, router.Config{
 		MaxRequestSize: h.Cluster.API.MaxRequestSize,
 		WrapCalls:      api.ComposeWrappers(ctrlctx.WrapCallsInTransactions(h.db), oidcAuthorizer.WrapCalls),
 	})
@@ -152,6 +155,8 @@ func (h *Handler) setup() {
 	h.proxy = &proxy{
 		Name: "arvados-controller",
 	}
+
+	go h.trashSweepWorker()
 }
 
 var errDBConnection = errors.New("database connection error")
diff --git a/lib/controller/handler_test.go b/lib/controller/handler_test.go
index f854079f9..a456627c0 100644
--- a/lib/controller/handler_test.go
+++ b/lib/controller/handler_test.go
@@ -35,7 +35,7 @@ var _ = check.Suite(&HandlerSuite{})
 
 type HandlerSuite struct {
 	cluster *arvados.Cluster
-	handler http.Handler
+	handler *Handler
 	ctx     context.Context
 	cancel  context.CancelFunc
 }
@@ -51,7 +51,7 @@ func (s *HandlerSuite) SetUpTest(c *check.C) {
 	s.cluster.TLS.Insecure = true
 	arvadostest.SetServiceURL(&s.cluster.Services.RailsAPI, "https://"+os.Getenv("ARVADOS_TEST_API_HOST"))
 	arvadostest.SetServiceURL(&s.cluster.Services.Controller, "http://localhost:/")
-	s.handler = newHandler(s.ctx, s.cluster, "", prometheus.NewRegistry())
+	s.handler = newHandler(s.ctx, s.cluster, "", prometheus.NewRegistry()).(*Handler)
 }
 
 func (s *HandlerSuite) TearDownTest(c *check.C) {
@@ -276,7 +276,7 @@ func (s *HandlerSuite) TestLogoutGoogle(c *check.C) {
 
 func (s *HandlerSuite) TestValidateV1APIToken(c *check.C) {
 	req := httptest.NewRequest("GET", "/arvados/v1/users/current", nil)
-	user, ok, err := s.handler.(*Handler).validateAPItoken(req, arvadostest.ActiveToken)
+	user, ok, err := s.handler.validateAPItoken(req, arvadostest.ActiveToken)
 	c.Assert(err, check.IsNil)
 	c.Check(ok, check.Equals, true)
 	c.Check(user.Authorization.UUID, check.Equals, arvadostest.ActiveTokenUUID)
@@ -287,7 +287,7 @@ func (s *HandlerSuite) TestValidateV1APIToken(c *check.C) {
 
 func (s *HandlerSuite) TestValidateV2APIToken(c *check.C) {
 	req := httptest.NewRequest("GET", "/arvados/v1/users/current", nil)
-	user, ok, err := s.handler.(*Handler).validateAPItoken(req, arvadostest.ActiveTokenV2)
+	user, ok, err := s.handler.validateAPItoken(req, arvadostest.ActiveTokenV2)
 	c.Assert(err, check.IsNil)
 	c.Check(ok, check.Equals, true)
 	c.Check(user.Authorization.UUID, check.Equals, arvadostest.ActiveTokenUUID)
@@ -319,11 +319,11 @@ func (s *HandlerSuite) TestValidateRemoteToken(c *check.C) {
 
 func (s *HandlerSuite) TestCreateAPIToken(c *check.C) {
 	req := httptest.NewRequest("GET", "/arvados/v1/users/current", nil)
-	auth, err := s.handler.(*Handler).createAPItoken(req, arvadostest.ActiveUserUUID, nil)
+	auth, err := s.handler.createAPItoken(req, arvadostest.ActiveUserUUID, nil)
 	c.Assert(err, check.IsNil)
 	c.Check(auth.Scopes, check.DeepEquals, []string{"all"})
 
-	user, ok, err := s.handler.(*Handler).validateAPItoken(req, auth.TokenV2())
+	user, ok, err := s.handler.validateAPItoken(req, auth.TokenV2())
 	c.Assert(err, check.IsNil)
 	c.Check(ok, check.Equals, true)
 	c.Check(user.Authorization.UUID, check.Equals, auth.UUID)
@@ -430,3 +430,30 @@ func (s *HandlerSuite) TestRedactRailsAPIHostFromErrors(c *check.C) {
 	c.Check(jresp.Errors[0], check.Matches, `.*//railsapi\.internal/arvados/v1/collections/.*: 404 Not Found.*`)
 	c.Check(jresp.Errors[0], check.Not(check.Matches), `(?ms).*127.0.0.1.*`)
 }
+
+func (s *HandlerSuite) TestTrashSweep(c *check.C) {
+	s.cluster.SystemRootToken = arvadostest.SystemRootToken
+	s.cluster.Collections.TrashSweepInterval = arvados.Duration(time.Second / 10)
+	s.handler.CheckHealth()
+	ctx := auth.NewContext(s.ctx, &auth.Credentials{Tokens: []string{arvadostest.ActiveTokenV2}})
+	coll, err := s.handler.federation.CollectionCreate(ctx, arvados.CreateOptions{Attrs: map[string]interface{}{"name": "test trash sweep"}, EnsureUniqueName: true})
+	c.Assert(err, check.IsNil)
+	defer s.handler.federation.CollectionDelete(ctx, arvados.DeleteOptions{UUID: coll.UUID})
+	db, err := s.handler.db(s.ctx)
+	c.Assert(err, check.IsNil)
+	_, err = db.ExecContext(s.ctx, `update collections set trash_at = $1, delete_at = $2 where uuid = $3`, time.Now().UTC().Add(time.Second/10), time.Now().UTC().Add(time.Hour), coll.UUID)
+	c.Assert(err, check.IsNil)
+	deadline := time.Now().Add(5 * time.Second)
+	for {
+		if time.Now().After(deadline) {
+			c.Log("timed out")
+			c.FailNow()
+		}
+		updated, err := s.handler.federation.CollectionGet(ctx, arvados.GetOptions{UUID: coll.UUID, IncludeTrash: true})
+		c.Assert(err, check.IsNil)
+		if updated.IsTrashed {
+			break
+		}
+		time.Sleep(time.Second / 10)
+	}
+}
diff --git a/lib/controller/rpc/conn.go b/lib/controller/rpc/conn.go
index 25f47bc3b..736ef711e 100644
--- a/lib/controller/rpc/conn.go
+++ b/lib/controller/rpc/conn.go
@@ -572,6 +572,13 @@ func (conn *Conn) SpecimenDelete(ctx context.Context, options arvados.DeleteOpti
 	return resp, err
 }
 
+func (conn *Conn) SysTrashSweep(ctx context.Context, options struct{}) (struct{}, error) {
+	ep := arvados.EndpointSysTrashSweep
+	var resp struct{}
+	err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
+	return resp, err
+}
+
 func (conn *Conn) UserCreate(ctx context.Context, options arvados.CreateOptions) (arvados.User, error) {
 	ep := arvados.EndpointUserCreate
 	var resp arvados.User
diff --git a/lib/controller/server_test.go b/lib/controller/server_test.go
index b2b3365a2..4f3d4a568 100644
--- a/lib/controller/server_test.go
+++ b/lib/controller/server_test.go
@@ -35,11 +35,14 @@ func integrationTestCluster() *arvados.Cluster {
 // provided by the integration-testing environment.
 func newServerFromIntegrationTestEnv(c *check.C) *httpserver.Server {
 	log := ctxlog.TestLogger(c)
-
-	handler := &Handler{Cluster: &arvados.Cluster{
-		ClusterID:  "zzzzz",
-		PostgreSQL: integrationTestCluster().PostgreSQL,
-	}}
+	ctx := ctxlog.Context(context.Background(), log)
+	handler := &Handler{
+		Cluster: &arvados.Cluster{
+			ClusterID:  "zzzzz",
+			PostgreSQL: integrationTestCluster().PostgreSQL,
+		},
+		BackgroundContext: ctx,
+	}
 	handler.Cluster.TLS.Insecure = true
 	handler.Cluster.Collections.BlobSigning = true
 	handler.Cluster.Collections.BlobSigningKey = arvadostest.BlobSigningKey
@@ -49,10 +52,8 @@ func newServerFromIntegrationTestEnv(c *check.C) *httpserver.Server {
 
 	srv := &httpserver.Server{
 		Server: http.Server{
-			BaseContext: func(net.Listener) context.Context {
-				return ctxlog.Context(context.Background(), log)
-			},
-			Handler: httpserver.AddRequestIDs(httpserver.LogRequests(handler)),
+			BaseContext: func(net.Listener) context.Context { return ctx },
+			Handler:     httpserver.AddRequestIDs(httpserver.LogRequests(handler)),
 		},
 		Addr: ":",
 	}
diff --git a/lib/controller/worker.go b/lib/controller/worker.go
new file mode 100644
index 000000000..02f3db330
--- /dev/null
+++ b/lib/controller/worker.go
@@ -0,0 +1,95 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package controller
+
+import (
+	"context"
+	"database/sql"
+	"time"
+
+	"git.arvados.org/arvados.git/sdk/go/auth"
+	"git.arvados.org/arvados.git/sdk/go/ctxlog"
+	"github.com/jmoiron/sqlx"
+)
+
+const (
+	// lock keys should be added here with explicit values, to
+	// ensure they do not get accidentally renumbered when a key
+	// is added or removed.
+	lockKeyTrashSweep = 10001
+)
+
+// dbLocker uses pg_advisory_lock to maintain a cluster-wide lock for
+// a long-running task like "do X every N seconds".
+type dbLocker struct {
+	GetDB   func(context.Context) (*sqlx.DB, error)
+	LockKey int
+
+	conn *sql.Conn // != nil if advisory lock is acquired
+}
+
+// Lock acquires the advisory lock the first time it is
+// called. Subsequent calls confirm that the lock is still active
+// (i.e., the session is still alive), and re-acquires if needed.
+func (dbl *dbLocker) Lock(ctx context.Context) {
+	logger := ctxlog.FromContext(ctx)
+	for ; ; time.Sleep(5 * time.Second) {
+		if dbl.conn == nil {
+			db, err := dbl.GetDB(ctx)
+			if err != nil {
+				logger.WithError(err).Infof("error getting database pool")
+				continue
+			}
+			conn, err := db.Conn(ctx)
+			if err != nil {
+				logger.WithError(err).Info("error getting database connection")
+				continue
+			}
+			_, err = conn.ExecContext(ctx, `SELECT pg_advisory_lock($1)`, dbl.LockKey)
+			if err != nil {
+				logger.WithError(err).Info("error getting lock")
+				conn.Close()
+				continue
+			}
+			dbl.conn = conn
+		}
+		err := dbl.conn.PingContext(ctx)
+		if err != nil {
+			logger.WithError(err).Info("database connection ping failed")
+			dbl.conn.Close()
+			dbl.conn = nil
+			continue
+		}
+		return
+	}
+}
+
+func (dbl *dbLocker) Unlock() {
+	if dbl.conn != nil {
+		dbl.conn.Close()
+		dbl.conn = nil
+	}
+}
+
+func (h *Handler) trashSweepWorker() {
+	sleep := h.Cluster.Collections.TrashSweepInterval.Duration()
+	logger := ctxlog.FromContext(h.BackgroundContext).WithField("worker", "trash sweep")
+	ctx := ctxlog.Context(h.BackgroundContext, logger)
+	if sleep <= 0 {
+		logger.Debugf("Collections.TrashSweepInterval is %v, not running worker", sleep)
+		return
+	}
+	locker := &dbLocker{GetDB: h.db, LockKey: lockKeyTrashSweep}
+	locker.Lock(ctx)
+	defer locker.Unlock()
+	for time.Sleep(sleep); ctx.Err() == nil; time.Sleep(sleep) {
+		locker.Lock(ctx)
+		ctx := auth.NewContext(ctx, &auth.Credentials{Tokens: []string{h.Cluster.SystemRootToken}})
+		_, err := h.federation.SysTrashSweep(ctx, struct{}{})
+		if err != nil {
+			logger.WithError(err).Info("trash sweep failed")
+		}
+	}
+}
diff --git a/sdk/go/arvados/api.go b/sdk/go/arvados/api.go
index 0fdc13d19..d4af0e7a8 100644
--- a/sdk/go/arvados/api.go
+++ b/sdk/go/arvados/api.go
@@ -68,6 +68,7 @@ var (
 	EndpointLinkGet                       = APIEndpoint{"GET", "arvados/v1/links/{uuid}", ""}
 	EndpointLinkList                      = APIEndpoint{"GET", "arvados/v1/links", ""}
 	EndpointLinkDelete                    = APIEndpoint{"DELETE", "arvados/v1/links/{uuid}", ""}
+	EndpointSysTrashSweep                 = APIEndpoint{"POST", "sys/trash_sweep", ""}
 	EndpointUserActivate                  = APIEndpoint{"POST", "arvados/v1/users/{uuid}/activate", ""}
 	EndpointUserCreate                    = APIEndpoint{"POST", "arvados/v1/users", "user"}
 	EndpointUserCurrent                   = APIEndpoint{"GET", "arvados/v1/users/current", ""}
@@ -269,6 +270,7 @@ type API interface {
 	SpecimenGet(ctx context.Context, options GetOptions) (Specimen, error)
 	SpecimenList(ctx context.Context, options ListOptions) (SpecimenList, error)
 	SpecimenDelete(ctx context.Context, options DeleteOptions) (Specimen, error)
+	SysTrashSweep(ctx context.Context, options struct{}) (struct{}, error)
 	UserCreate(ctx context.Context, options CreateOptions) (User, error)
 	UserUpdate(ctx context.Context, options UpdateOptions) (User, error)
 	UserMerge(ctx context.Context, options UserMergeOptions) (User, error)
diff --git a/sdk/go/arvados/client.go b/sdk/go/arvados/client.go
index 13bb3bf80..5ec828667 100644
--- a/sdk/go/arvados/client.go
+++ b/sdk/go/arvados/client.go
@@ -217,6 +217,8 @@ func (c *Client) DoAndDecode(dst interface{}, req *http.Request) error {
 		return err
 	}
 	switch {
+	case resp.StatusCode == http.StatusNoContent:
+		return nil
 	case resp.StatusCode == http.StatusOK && dst == nil:
 		return nil
 	case resp.StatusCode == http.StatusOK:
diff --git a/services/api/app/controllers/sys_controller.rb b/services/api/app/controllers/sys_controller.rb
index ecc02e83d..08a672cc0 100644
--- a/services/api/app/controllers/sys_controller.rb
+++ b/services/api/app/controllers/sys_controller.rb
@@ -7,7 +7,7 @@ class SysController < ApplicationController
   skip_before_action :render_404_if_no_object
   before_action :admin_required
 
-  def sweep_trash
+  def trash_sweep
     act_as_system_user do
       # Sweep trashed collections
       Collection.
@@ -31,6 +31,7 @@ class SysController < ApplicationController
       # Sweep expired tokens
       ActiveRecord::Base.connection.execute("DELETE from api_client_authorizations where expires_at <= statement_timestamp()")
     end
+    head :no_content
   end
 
   protected
diff --git a/services/api/config/routes.rb b/services/api/config/routes.rb
index a0c3cafe1..98f5788d6 100644
--- a/services/api/config/routes.rb
+++ b/services/api/config/routes.rb
@@ -92,7 +92,7 @@ Rails.application.routes.draw do
     end
   end
 
-  post '/sys/sweep_trash', to: 'sys#sweep_trash'
+  post '/sys/trash_sweep', to: 'sys#trash_sweep'
 
   if Rails.env == 'test'
     post '/database/reset', to: 'database#reset'

commit 685db28b50225cde7dbb03aa2275f7a165d888a3
Author: Tom Clegg <tom at curii.com>
Date:   Fri Nov 12 17:00:57 2021 -0500

    18339: Move trash-sweep from background thread to controller action.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/services/api/lib/sweep_trashed_objects.rb b/services/api/app/controllers/sys_controller.rb
similarity index 64%
rename from services/api/lib/sweep_trashed_objects.rb
rename to services/api/app/controllers/sys_controller.rb
index c09896567..ecc02e83d 100644
--- a/services/api/lib/sweep_trashed_objects.rb
+++ b/services/api/app/controllers/sys_controller.rb
@@ -2,33 +2,12 @@
 #
 # SPDX-License-Identifier: AGPL-3.0
 
-require 'current_api_client'
+class SysController < ApplicationController
+  skip_before_action :find_object_by_uuid
+  skip_before_action :render_404_if_no_object
+  before_action :admin_required
 
-module SweepTrashedObjects
-  extend CurrentApiClient
-
-  def self.delete_project_and_contents(p_uuid)
-    p = Group.find_by_uuid(p_uuid)
-    if !p || p.group_class != 'project'
-      raise "can't sweep group '#{p_uuid}', it may not exist or not be a project"
-    end
-    # First delete sub projects
-    Group.where({group_class: 'project', owner_uuid: p_uuid}).each do |sub_project|
-      delete_project_and_contents(sub_project.uuid)
-    end
-    # Next, iterate over all tables which have owner_uuid fields, with some
-    # exceptions, and delete records owned by this project
-    skipped_classes = ['Group', 'User']
-    ActiveRecord::Base.descendants.reject(&:abstract_class?).each do |klass|
-      if !skipped_classes.include?(klass.name) && klass.columns.collect(&:name).include?('owner_uuid')
-        klass.where({owner_uuid: p_uuid}).destroy_all
-      end
-    end
-    # Finally delete the project itself
-    p.destroy
-  end
-
-  def self.sweep_now
+  def sweep_trash
     act_as_system_user do
       # Sweep trashed collections
       Collection.
@@ -54,29 +33,26 @@ module SweepTrashedObjects
     end
   end
 
-  def self.sweep_if_stale
-    return if Rails.configuration.Collections.TrashSweepInterval <= 0
-    exp = Rails.configuration.Collections.TrashSweepInterval.seconds
-    need = false
-    Rails.cache.fetch('SweepTrashedObjects', expires_in: exp) do
-      need = true
+  protected
+
+  def delete_project_and_contents(p_uuid)
+    p = Group.find_by_uuid(p_uuid)
+    if !p || p.group_class != 'project'
+      raise "can't sweep group '#{p_uuid}', it may not exist or not be a project"
+    end
+    # First delete sub projects
+    Group.where({group_class: 'project', owner_uuid: p_uuid}).each do |sub_project|
+      delete_project_and_contents(sub_project.uuid)
     end
-    if need
-      Thread.new do
-        Thread.current.abort_on_exception = false
-        begin
-          sweep_now
-        rescue => e
-          Rails.logger.error "#{e.class}: #{e}\n#{e.backtrace.join("\n\t")}"
-        ensure
-          # Rails 5.1+ makes test threads share a database connection, so we can't
-          # close a connection shared with other threads.
-          # https://github.com/rails/rails/commit/deba47799ff905f778e0c98a015789a1327d5087
-          if Rails.env != "test"
-            ActiveRecord::Base.connection.close
-          end
-        end
+    # Next, iterate over all tables which have owner_uuid fields, with some
+    # exceptions, and delete records owned by this project
+    skipped_classes = ['Group', 'User']
+    ActiveRecord::Base.descendants.reject(&:abstract_class?).each do |klass|
+      if !skipped_classes.include?(klass.name) && klass.columns.collect(&:name).include?('owner_uuid')
+        klass.where({owner_uuid: p_uuid}).destroy_all
       end
     end
+    # Finally delete the project itself
+    p.destroy
   end
 end
diff --git a/services/api/app/models/collection.rb b/services/api/app/models/collection.rb
index a98cde444..b4660dbd3 100644
--- a/services/api/app/models/collection.rb
+++ b/services/api/app/models/collection.rb
@@ -3,7 +3,6 @@
 # SPDX-License-Identifier: AGPL-3.0
 
 require 'arvados/keep'
-require 'sweep_trashed_objects'
 require 'trashable'
 
 class Collection < ArvadosModel
@@ -616,11 +615,6 @@ class Collection < ArvadosModel
     super - ["manifest_text", "storage_classes_desired", "storage_classes_confirmed", "current_version_uuid"]
   end
 
-  def self.where *args
-    SweepTrashedObjects.sweep_if_stale
-    super
-  end
-
   protected
 
   # Although the defaults for these columns is already set up on the schema,
diff --git a/services/api/config/routes.rb b/services/api/config/routes.rb
index 738426b1d..a0c3cafe1 100644
--- a/services/api/config/routes.rb
+++ b/services/api/config/routes.rb
@@ -92,6 +92,8 @@ Rails.application.routes.draw do
     end
   end
 
+  post '/sys/sweep_trash', to: 'sys#sweep_trash'
+
   if Rails.env == 'test'
     post '/database/reset', to: 'database#reset'
   end
diff --git a/services/api/test/unit/api_client_authorization_test.rb b/services/api/test/unit/api_client_authorization_test.rb
index fb90418b8..e043f8914 100644
--- a/services/api/test/unit/api_client_authorization_test.rb
+++ b/services/api/test/unit/api_client_authorization_test.rb
@@ -3,7 +3,6 @@
 # SPDX-License-Identifier: AGPL-3.0
 
 require 'test_helper'
-require 'sweep_trashed_objects'
 
 class ApiClientAuthorizationTest < ActiveSupport::TestCase
   include CurrentApiClient
@@ -20,12 +19,6 @@ class ApiClientAuthorizationTest < ActiveSupport::TestCase
     end
   end
 
-  test "delete expired in SweepTrashedObjects" do
-    assert_not_empty ApiClientAuthorization.where(uuid: api_client_authorizations(:expired).uuid)
-    SweepTrashedObjects.sweep_now
-    assert_empty ApiClientAuthorization.where(uuid: api_client_authorizations(:expired).uuid)
-  end
-
   test "accepts SystemRootToken" do
     assert_nil ApiClientAuthorization.validate(token: "xxxSystemRootTokenxxx")
 
diff --git a/services/api/test/unit/collection_test.rb b/services/api/test/unit/collection_test.rb
index de0f1d360..e7134a5be 100644
--- a/services/api/test/unit/collection_test.rb
+++ b/services/api/test/unit/collection_test.rb
@@ -3,7 +3,6 @@
 # SPDX-License-Identifier: AGPL-3.0
 
 require 'test_helper'
-require 'sweep_trashed_objects'
 require 'fix_collection_versions_timestamps'
 
 class CollectionTest < ActiveSupport::TestCase
@@ -1058,60 +1057,6 @@ class CollectionTest < ActiveSupport::TestCase
     assert_includes(coll_uuids, collections(:docker_image).uuid)
   end
 
-  test "move collections to trash in SweepTrashedObjects" do
-    c = collections(:trashed_on_next_sweep)
-    refute_empty Collection.where('uuid=? and is_trashed=false', c.uuid)
-    assert_raises(ActiveRecord::RecordNotUnique) do
-      act_as_user users(:active) do
-        Collection.create!(owner_uuid: c.owner_uuid,
-                           name: c.name)
-      end
-    end
-    SweepTrashedObjects.sweep_now
-    c = Collection.where('uuid=? and is_trashed=true', c.uuid).first
-    assert c
-    act_as_user users(:active) do
-      assert Collection.create!(owner_uuid: c.owner_uuid,
-                                name: c.name)
-    end
-  end
-
-  test "delete collections in SweepTrashedObjects" do
-    uuid = 'zzzzz-4zz18-3u1p5umicfpqszp' # deleted_on_next_sweep
-    assert_not_empty Collection.where(uuid: uuid)
-    SweepTrashedObjects.sweep_now
-    assert_empty Collection.where(uuid: uuid)
-  end
-
-  test "delete referring links in SweepTrashedObjects" do
-    uuid = collections(:trashed_on_next_sweep).uuid
-    act_as_system_user do
-      assert_raises ActiveRecord::RecordInvalid do
-        # Cannot create because :trashed_on_next_sweep is already trashed
-        Link.create!(head_uuid: uuid,
-                     tail_uuid: system_user_uuid,
-                     link_class: 'whatever',
-                     name: 'something')
-      end
-
-      # Bump trash_at to now + 1 minute
-      Collection.where(uuid: uuid).
-        update(trash_at: db_current_time + (1).minute)
-
-      # Not considered trashed now
-      Link.create!(head_uuid: uuid,
-                   tail_uuid: system_user_uuid,
-                   link_class: 'whatever',
-                   name: 'something')
-    end
-    past = db_current_time
-    Collection.where(uuid: uuid).
-      update_all(is_trashed: true, trash_at: past, delete_at: past)
-    assert_not_empty Collection.where(uuid: uuid)
-    SweepTrashedObjects.sweep_now
-    assert_empty Collection.where(uuid: uuid)
-  end
-
   test "empty names are exempt from name uniqueness" do
     act_as_user users(:active) do
       c1 = Collection.new(name: nil, manifest_text: '', owner_uuid: groups(:aproject).uuid)
diff --git a/services/api/test/unit/group_test.rb b/services/api/test/unit/group_test.rb
index 017916f48..10932e116 100644
--- a/services/api/test/unit/group_test.rb
+++ b/services/api/test/unit/group_test.rb
@@ -228,50 +228,6 @@ class GroupTest < ActiveSupport::TestCase
     assert User.readable_by(users(:admin)).where(uuid:  u_bar.uuid).any?
   end
 
-  test "move projects to trash in SweepTrashedObjects" do
-    p = groups(:trashed_on_next_sweep)
-    assert_empty Group.where('uuid=? and is_trashed=true', p.uuid)
-    SweepTrashedObjects.sweep_now
-    assert_not_empty Group.where('uuid=? and is_trashed=true', p.uuid)
-  end
-
-  test "delete projects and their contents in SweepTrashedObjects" do
-    g_foo = groups(:trashed_project)
-    g_bar = groups(:trashed_subproject)
-    g_baz = groups(:trashed_subproject3)
-    col = collections(:collection_in_trashed_subproject)
-    job = jobs(:job_in_trashed_project)
-    cr = container_requests(:cr_in_trashed_project)
-    # Save how many objects were before the sweep
-    user_nr_was = User.all.length
-    coll_nr_was = Collection.all.length
-    group_nr_was = Group.where('group_class<>?', 'project').length
-    project_nr_was = Group.where(group_class: 'project').length
-    cr_nr_was = ContainerRequest.all.length
-    job_nr_was = Job.all.length
-    assert_not_empty Group.where(uuid: g_foo.uuid)
-    assert_not_empty Group.where(uuid: g_bar.uuid)
-    assert_not_empty Group.where(uuid: g_baz.uuid)
-    assert_not_empty Collection.where(uuid: col.uuid)
-    assert_not_empty Job.where(uuid: job.uuid)
-    assert_not_empty ContainerRequest.where(uuid: cr.uuid)
-    SweepTrashedObjects.sweep_now
-    assert_empty Group.where(uuid: g_foo.uuid)
-    assert_empty Group.where(uuid: g_bar.uuid)
-    assert_empty Group.where(uuid: g_baz.uuid)
-    assert_empty Collection.where(uuid: col.uuid)
-    assert_empty Job.where(uuid: job.uuid)
-    assert_empty ContainerRequest.where(uuid: cr.uuid)
-    # No unwanted deletions should have happened
-    assert_equal user_nr_was, User.all.length
-    assert_equal coll_nr_was-2,        # collection_in_trashed_subproject
-                 Collection.all.length # & deleted_on_next_sweep collections
-    assert_equal group_nr_was, Group.where('group_class<>?', 'project').length
-    assert_equal project_nr_was-3, Group.where(group_class: 'project').length
-    assert_equal cr_nr_was-1, ContainerRequest.all.length
-    assert_equal job_nr_was-1, Job.all.length
-  end
-
   test "project names must be displayable in a filesystem" do
     set_user_from_auth :active
     ["", "{SOLIDUS}"].each do |subst|

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list