[ARVADOS] created: 2.1.0-1610-gf23f5689e
Git user
git at public.arvados.org
Tue Nov 16 16:08:37 UTC 2021
at f23f5689eac6354eb9567c91f2ff8586e2118e92 (commit)
commit f23f5689eac6354eb9567c91f2ff8586e2118e92
Author: Tom Clegg <tom at curii.com>
Date: Tue Nov 16 11:06:23 2021 -0500
18339: Extract dblocker to a package.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/controller/dblock/dblock.go b/lib/controller/dblock/dblock.go
new file mode 100644
index 000000000..b0d348870
--- /dev/null
+++ b/lib/controller/dblock/dblock.go
@@ -0,0 +1,101 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dblock
+
+import (
+ "context"
+ "database/sql"
+ "sync"
+ "time"
+
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
+ "github.com/jmoiron/sqlx"
+)
+
+var (
+ TrashSweep = &DBLocker{key: 10001}
+ retryDelay = 5 * time.Second
+)
+
+// DBLocker uses pg_advisory_lock to maintain a cluster-wide lock for
+// a long-running task like "do X every N seconds".
+type DBLocker struct {
+ key int
+ mtx sync.Mutex
+ ctx context.Context
+ getdb func(context.Context) (*sqlx.DB, error)
+ conn *sql.Conn // != nil if advisory lock has been acquired
+}
+
+// Lock acquires the advisory lock, waiting/reconnecting if needed.
+func (dbl *DBLocker) Lock(ctx context.Context, getdb func(context.Context) (*sqlx.DB, error)) {
+ logger := ctxlog.FromContext(ctx)
+ for ; ; time.Sleep(retryDelay) {
+ dbl.mtx.Lock()
+ if dbl.conn != nil {
+ // Already locked by another caller in this
+ // process. Wait for them to release.
+ dbl.mtx.Unlock()
+ continue
+ }
+ db, err := getdb(ctx)
+ if err != nil {
+ logger.WithError(err).Infof("error getting database pool")
+ dbl.mtx.Unlock()
+ continue
+ }
+ conn, err := db.Conn(ctx)
+ if err != nil {
+ logger.WithError(err).Info("error getting database connection")
+ dbl.mtx.Unlock()
+ continue
+ }
+ _, err = conn.ExecContext(ctx, `SELECT pg_advisory_lock($1)`, dbl.key)
+ if err != nil {
+ logger.WithError(err).Infof("error getting pg_advisory_lock %d", dbl.key)
+ conn.Close()
+ dbl.mtx.Unlock()
+ continue
+ }
+ logger.Debugf("acquired pg_advisory_lock %d", dbl.key)
+ dbl.ctx, dbl.getdb, dbl.conn = ctx, getdb, conn
+ dbl.mtx.Unlock()
+ return
+ }
+}
+
+// Check confirms that the lock is still active (i.e., the session is
+// still alive), and re-acquires if needed. Panics if Lock is not
+// acquired first.
+func (dbl *DBLocker) Check() {
+ dbl.mtx.Lock()
+ err := dbl.conn.PingContext(dbl.ctx)
+ if err == nil {
+ ctxlog.FromContext(dbl.ctx).Debugf("pg_advisory_lock %d connection still alive", dbl.key)
+ dbl.mtx.Unlock()
+ return
+ }
+ ctxlog.FromContext(dbl.ctx).WithError(err).Info("database connection ping failed")
+ dbl.conn.Close()
+ dbl.conn = nil
+ ctx, getdb := dbl.ctx, dbl.getdb
+ dbl.mtx.Unlock()
+ dbl.Lock(ctx, getdb)
+}
+
+func (dbl *DBLocker) Unlock() {
+ dbl.mtx.Lock()
+ defer dbl.mtx.Unlock()
+ if dbl.conn != nil {
+ _, err := dbl.conn.ExecContext(context.Background(), `SELECT pg_advisory_unlock($1)`, dbl.key)
+ if err != nil {
+ ctxlog.FromContext(dbl.ctx).WithError(err).Infof("error releasing pg_advisory_lock %d", dbl.key)
+ } else {
+ ctxlog.FromContext(dbl.ctx).Debugf("released pg_advisory_lock %d", dbl.key)
+ }
+ dbl.conn.Close()
+ dbl.conn = nil
+ }
+}
diff --git a/lib/controller/trash.go b/lib/controller/trash.go
new file mode 100644
index 000000000..551b2f92b
--- /dev/null
+++ b/lib/controller/trash.go
@@ -0,0 +1,33 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package controller
+
+import (
+ "time"
+
+ "git.arvados.org/arvados.git/lib/controller/dblock"
+ "git.arvados.org/arvados.git/sdk/go/auth"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
+)
+
+func (h *Handler) trashSweepWorker() {
+ sleep := h.Cluster.Collections.TrashSweepInterval.Duration()
+ logger := ctxlog.FromContext(h.BackgroundContext).WithField("worker", "trash sweep")
+ ctx := ctxlog.Context(h.BackgroundContext, logger)
+ if sleep <= 0 {
+ logger.Debugf("Collections.TrashSweepInterval is %v, not running worker", sleep)
+ return
+ }
+ dblock.TrashSweep.Lock(ctx, h.db)
+ defer dblock.TrashSweep.Unlock()
+ for time.Sleep(sleep); ctx.Err() == nil; time.Sleep(sleep) {
+ dblock.TrashSweep.Check()
+ ctx := auth.NewContext(ctx, &auth.Credentials{Tokens: []string{h.Cluster.SystemRootToken}})
+ _, err := h.federation.SysTrashSweep(ctx, struct{}{})
+ if err != nil {
+ logger.WithError(err).Info("trash sweep failed")
+ }
+ }
+}
diff --git a/lib/controller/worker.go b/lib/controller/worker.go
deleted file mode 100644
index 02f3db330..000000000
--- a/lib/controller/worker.go
+++ /dev/null
@@ -1,95 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package controller
-
-import (
- "context"
- "database/sql"
- "time"
-
- "git.arvados.org/arvados.git/sdk/go/auth"
- "git.arvados.org/arvados.git/sdk/go/ctxlog"
- "github.com/jmoiron/sqlx"
-)
-
-const (
- // lock keys should be added here with explicit values, to
- // ensure they do not get accidentally renumbered when a key
- // is added or removed.
- lockKeyTrashSweep = 10001
-)
-
-// dbLocker uses pg_advisory_lock to maintain a cluster-wide lock for
-// a long-running task like "do X every N seconds".
-type dbLocker struct {
- GetDB func(context.Context) (*sqlx.DB, error)
- LockKey int
-
- conn *sql.Conn // != nil if advisory lock is acquired
-}
-
-// Lock acquires the advisory lock the first time it is
-// called. Subsequent calls confirm that the lock is still active
-// (i.e., the session is still alive), and re-acquires if needed.
-func (dbl *dbLocker) Lock(ctx context.Context) {
- logger := ctxlog.FromContext(ctx)
- for ; ; time.Sleep(5 * time.Second) {
- if dbl.conn == nil {
- db, err := dbl.GetDB(ctx)
- if err != nil {
- logger.WithError(err).Infof("error getting database pool")
- continue
- }
- conn, err := db.Conn(ctx)
- if err != nil {
- logger.WithError(err).Info("error getting database connection")
- continue
- }
- _, err = conn.ExecContext(ctx, `SELECT pg_advisory_lock($1)`, dbl.LockKey)
- if err != nil {
- logger.WithError(err).Info("error getting lock")
- conn.Close()
- continue
- }
- dbl.conn = conn
- }
- err := dbl.conn.PingContext(ctx)
- if err != nil {
- logger.WithError(err).Info("database connection ping failed")
- dbl.conn.Close()
- dbl.conn = nil
- continue
- }
- return
- }
-}
-
-func (dbl *dbLocker) Unlock() {
- if dbl.conn != nil {
- dbl.conn.Close()
- dbl.conn = nil
- }
-}
-
-func (h *Handler) trashSweepWorker() {
- sleep := h.Cluster.Collections.TrashSweepInterval.Duration()
- logger := ctxlog.FromContext(h.BackgroundContext).WithField("worker", "trash sweep")
- ctx := ctxlog.Context(h.BackgroundContext, logger)
- if sleep <= 0 {
- logger.Debugf("Collections.TrashSweepInterval is %v, not running worker", sleep)
- return
- }
- locker := &dbLocker{GetDB: h.db, LockKey: lockKeyTrashSweep}
- locker.Lock(ctx)
- defer locker.Unlock()
- for time.Sleep(sleep); ctx.Err() == nil; time.Sleep(sleep) {
- locker.Lock(ctx)
- ctx := auth.NewContext(ctx, &auth.Credentials{Tokens: []string{h.Cluster.SystemRootToken}})
- _, err := h.federation.SysTrashSweep(ctx, struct{}{})
- if err != nil {
- logger.WithError(err).Info("trash sweep failed")
- }
- }
-}
commit fdf081b663b91c1d0af669e0224e67a47b8497a3
Author: Tom Clegg <tom at curii.com>
Date: Mon Nov 15 15:21:56 2021 -0500
18339: Call trash_sweep periodically from controller.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/controller/auth_test.go b/lib/controller/auth_test.go
index 175241146..5d477a766 100644
--- a/lib/controller/auth_test.go
+++ b/lib/controller/auth_test.go
@@ -98,7 +98,7 @@ func (s *AuthSuite) SetUpTest(c *check.C) {
cluster.Login.OpenIDConnect.AcceptAccessToken = true
cluster.Login.OpenIDConnect.AcceptAccessTokenScope = ""
- s.testHandler = &Handler{Cluster: cluster}
+ s.testHandler = &Handler{Cluster: cluster, BackgroundContext: ctxlog.Context(context.Background(), s.log)}
s.testServer = newServerFromIntegrationTestEnv(c)
s.testServer.Server.BaseContext = func(net.Listener) context.Context {
return ctxlog.Context(context.Background(), s.log)
diff --git a/lib/controller/cmd.go b/lib/controller/cmd.go
index 7ab7f5305..96972251a 100644
--- a/lib/controller/cmd.go
+++ b/lib/controller/cmd.go
@@ -16,6 +16,6 @@ import (
// Command starts a controller service. See cmd/arvados-server/cmd.go
var Command cmd.Handler = service.Command(arvados.ServiceNameController, newHandler)
-func newHandler(_ context.Context, cluster *arvados.Cluster, _ string, _ *prometheus.Registry) service.Handler {
- return &Handler{Cluster: cluster}
+func newHandler(ctx context.Context, cluster *arvados.Cluster, _ string, _ *prometheus.Registry) service.Handler {
+ return &Handler{Cluster: cluster, BackgroundContext: ctx}
}
diff --git a/lib/controller/federation/conn.go b/lib/controller/federation/conn.go
index d1bf473d7..d4155da10 100644
--- a/lib/controller/federation/conn.go
+++ b/lib/controller/federation/conn.go
@@ -525,6 +525,10 @@ func (conn *Conn) SpecimenDelete(ctx context.Context, options arvados.DeleteOpti
return conn.chooseBackend(options.UUID).SpecimenDelete(ctx, options)
}
+func (conn *Conn) SysTrashSweep(ctx context.Context, options struct{}) (struct{}, error) {
+ return conn.local.SysTrashSweep(ctx, options)
+}
+
var userAttrsCachedFromLoginCluster = map[string]bool{
"created_at": true,
"email": true,
diff --git a/lib/controller/federation_test.go b/lib/controller/federation_test.go
index 211c76198..eb398695b 100644
--- a/lib/controller/federation_test.go
+++ b/lib/controller/federation_test.go
@@ -70,7 +70,7 @@ func (s *FederationSuite) SetUpTest(c *check.C) {
cluster.Collections.BlobSigningTTL = arvados.Duration(time.Hour * 24 * 14)
arvadostest.SetServiceURL(&cluster.Services.RailsAPI, "http://localhost:1/")
arvadostest.SetServiceURL(&cluster.Services.Controller, "http://localhost:/")
- s.testHandler = &Handler{Cluster: cluster}
+ s.testHandler = &Handler{Cluster: cluster, BackgroundContext: ctxlog.Context(context.Background(), s.log)}
s.testServer = newServerFromIntegrationTestEnv(c)
s.testServer.Server.BaseContext = func(net.Listener) context.Context {
return ctxlog.Context(context.Background(), s.log)
diff --git a/lib/controller/handler.go b/lib/controller/handler.go
index b51d90911..965ba040e 100644
--- a/lib/controller/handler.go
+++ b/lib/controller/handler.go
@@ -32,9 +32,11 @@ import (
)
type Handler struct {
- Cluster *arvados.Cluster
+ Cluster *arvados.Cluster
+ BackgroundContext context.Context
setupOnce sync.Once
+ federation *federation.Conn
handlerStack http.Handler
proxy *proxy
secureClient *http.Client
@@ -103,7 +105,8 @@ func (h *Handler) setup() {
healthFuncs := make(map[string]health.Func)
oidcAuthorizer := localdb.OIDCAccessTokenAuthorizer(h.Cluster, h.db)
- rtr := router.New(federation.New(h.Cluster, &healthFuncs), router.Config{
+ h.federation = federation.New(h.Cluster, &healthFuncs)
+ rtr := router.New(h.federation, router.Config{
MaxRequestSize: h.Cluster.API.MaxRequestSize,
WrapCalls: api.ComposeWrappers(ctrlctx.WrapCallsInTransactions(h.db), oidcAuthorizer.WrapCalls),
})
@@ -152,6 +155,8 @@ func (h *Handler) setup() {
h.proxy = &proxy{
Name: "arvados-controller",
}
+
+ go h.trashSweepWorker()
}
var errDBConnection = errors.New("database connection error")
diff --git a/lib/controller/handler_test.go b/lib/controller/handler_test.go
index f854079f9..a456627c0 100644
--- a/lib/controller/handler_test.go
+++ b/lib/controller/handler_test.go
@@ -35,7 +35,7 @@ var _ = check.Suite(&HandlerSuite{})
type HandlerSuite struct {
cluster *arvados.Cluster
- handler http.Handler
+ handler *Handler
ctx context.Context
cancel context.CancelFunc
}
@@ -51,7 +51,7 @@ func (s *HandlerSuite) SetUpTest(c *check.C) {
s.cluster.TLS.Insecure = true
arvadostest.SetServiceURL(&s.cluster.Services.RailsAPI, "https://"+os.Getenv("ARVADOS_TEST_API_HOST"))
arvadostest.SetServiceURL(&s.cluster.Services.Controller, "http://localhost:/")
- s.handler = newHandler(s.ctx, s.cluster, "", prometheus.NewRegistry())
+ s.handler = newHandler(s.ctx, s.cluster, "", prometheus.NewRegistry()).(*Handler)
}
func (s *HandlerSuite) TearDownTest(c *check.C) {
@@ -276,7 +276,7 @@ func (s *HandlerSuite) TestLogoutGoogle(c *check.C) {
func (s *HandlerSuite) TestValidateV1APIToken(c *check.C) {
req := httptest.NewRequest("GET", "/arvados/v1/users/current", nil)
- user, ok, err := s.handler.(*Handler).validateAPItoken(req, arvadostest.ActiveToken)
+ user, ok, err := s.handler.validateAPItoken(req, arvadostest.ActiveToken)
c.Assert(err, check.IsNil)
c.Check(ok, check.Equals, true)
c.Check(user.Authorization.UUID, check.Equals, arvadostest.ActiveTokenUUID)
@@ -287,7 +287,7 @@ func (s *HandlerSuite) TestValidateV1APIToken(c *check.C) {
func (s *HandlerSuite) TestValidateV2APIToken(c *check.C) {
req := httptest.NewRequest("GET", "/arvados/v1/users/current", nil)
- user, ok, err := s.handler.(*Handler).validateAPItoken(req, arvadostest.ActiveTokenV2)
+ user, ok, err := s.handler.validateAPItoken(req, arvadostest.ActiveTokenV2)
c.Assert(err, check.IsNil)
c.Check(ok, check.Equals, true)
c.Check(user.Authorization.UUID, check.Equals, arvadostest.ActiveTokenUUID)
@@ -319,11 +319,11 @@ func (s *HandlerSuite) TestValidateRemoteToken(c *check.C) {
func (s *HandlerSuite) TestCreateAPIToken(c *check.C) {
req := httptest.NewRequest("GET", "/arvados/v1/users/current", nil)
- auth, err := s.handler.(*Handler).createAPItoken(req, arvadostest.ActiveUserUUID, nil)
+ auth, err := s.handler.createAPItoken(req, arvadostest.ActiveUserUUID, nil)
c.Assert(err, check.IsNil)
c.Check(auth.Scopes, check.DeepEquals, []string{"all"})
- user, ok, err := s.handler.(*Handler).validateAPItoken(req, auth.TokenV2())
+ user, ok, err := s.handler.validateAPItoken(req, auth.TokenV2())
c.Assert(err, check.IsNil)
c.Check(ok, check.Equals, true)
c.Check(user.Authorization.UUID, check.Equals, auth.UUID)
@@ -430,3 +430,30 @@ func (s *HandlerSuite) TestRedactRailsAPIHostFromErrors(c *check.C) {
c.Check(jresp.Errors[0], check.Matches, `.*//railsapi\.internal/arvados/v1/collections/.*: 404 Not Found.*`)
c.Check(jresp.Errors[0], check.Not(check.Matches), `(?ms).*127.0.0.1.*`)
}
+
+func (s *HandlerSuite) TestTrashSweep(c *check.C) {
+ s.cluster.SystemRootToken = arvadostest.SystemRootToken
+ s.cluster.Collections.TrashSweepInterval = arvados.Duration(time.Second / 10)
+ s.handler.CheckHealth()
+ ctx := auth.NewContext(s.ctx, &auth.Credentials{Tokens: []string{arvadostest.ActiveTokenV2}})
+ coll, err := s.handler.federation.CollectionCreate(ctx, arvados.CreateOptions{Attrs: map[string]interface{}{"name": "test trash sweep"}, EnsureUniqueName: true})
+ c.Assert(err, check.IsNil)
+ defer s.handler.federation.CollectionDelete(ctx, arvados.DeleteOptions{UUID: coll.UUID})
+ db, err := s.handler.db(s.ctx)
+ c.Assert(err, check.IsNil)
+ _, err = db.ExecContext(s.ctx, `update collections set trash_at = $1, delete_at = $2 where uuid = $3`, time.Now().UTC().Add(time.Second/10), time.Now().UTC().Add(time.Hour), coll.UUID)
+ c.Assert(err, check.IsNil)
+ deadline := time.Now().Add(5 * time.Second)
+ for {
+ if time.Now().After(deadline) {
+ c.Log("timed out")
+ c.FailNow()
+ }
+ updated, err := s.handler.federation.CollectionGet(ctx, arvados.GetOptions{UUID: coll.UUID, IncludeTrash: true})
+ c.Assert(err, check.IsNil)
+ if updated.IsTrashed {
+ break
+ }
+ time.Sleep(time.Second / 10)
+ }
+}
diff --git a/lib/controller/rpc/conn.go b/lib/controller/rpc/conn.go
index 25f47bc3b..736ef711e 100644
--- a/lib/controller/rpc/conn.go
+++ b/lib/controller/rpc/conn.go
@@ -572,6 +572,13 @@ func (conn *Conn) SpecimenDelete(ctx context.Context, options arvados.DeleteOpti
return resp, err
}
+func (conn *Conn) SysTrashSweep(ctx context.Context, options struct{}) (struct{}, error) {
+ ep := arvados.EndpointSysTrashSweep
+ var resp struct{}
+ err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
+ return resp, err
+}
+
func (conn *Conn) UserCreate(ctx context.Context, options arvados.CreateOptions) (arvados.User, error) {
ep := arvados.EndpointUserCreate
var resp arvados.User
diff --git a/lib/controller/server_test.go b/lib/controller/server_test.go
index b2b3365a2..4f3d4a568 100644
--- a/lib/controller/server_test.go
+++ b/lib/controller/server_test.go
@@ -35,11 +35,14 @@ func integrationTestCluster() *arvados.Cluster {
// provided by the integration-testing environment.
func newServerFromIntegrationTestEnv(c *check.C) *httpserver.Server {
log := ctxlog.TestLogger(c)
-
- handler := &Handler{Cluster: &arvados.Cluster{
- ClusterID: "zzzzz",
- PostgreSQL: integrationTestCluster().PostgreSQL,
- }}
+ ctx := ctxlog.Context(context.Background(), log)
+ handler := &Handler{
+ Cluster: &arvados.Cluster{
+ ClusterID: "zzzzz",
+ PostgreSQL: integrationTestCluster().PostgreSQL,
+ },
+ BackgroundContext: ctx,
+ }
handler.Cluster.TLS.Insecure = true
handler.Cluster.Collections.BlobSigning = true
handler.Cluster.Collections.BlobSigningKey = arvadostest.BlobSigningKey
@@ -49,10 +52,8 @@ func newServerFromIntegrationTestEnv(c *check.C) *httpserver.Server {
srv := &httpserver.Server{
Server: http.Server{
- BaseContext: func(net.Listener) context.Context {
- return ctxlog.Context(context.Background(), log)
- },
- Handler: httpserver.AddRequestIDs(httpserver.LogRequests(handler)),
+ BaseContext: func(net.Listener) context.Context { return ctx },
+ Handler: httpserver.AddRequestIDs(httpserver.LogRequests(handler)),
},
Addr: ":",
}
diff --git a/lib/controller/worker.go b/lib/controller/worker.go
new file mode 100644
index 000000000..02f3db330
--- /dev/null
+++ b/lib/controller/worker.go
@@ -0,0 +1,95 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package controller
+
+import (
+ "context"
+ "database/sql"
+ "time"
+
+ "git.arvados.org/arvados.git/sdk/go/auth"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
+ "github.com/jmoiron/sqlx"
+)
+
+const (
+ // lock keys should be added here with explicit values, to
+ // ensure they do not get accidentally renumbered when a key
+ // is added or removed.
+ lockKeyTrashSweep = 10001
+)
+
+// dbLocker uses pg_advisory_lock to maintain a cluster-wide lock for
+// a long-running task like "do X every N seconds".
+type dbLocker struct {
+ GetDB func(context.Context) (*sqlx.DB, error)
+ LockKey int
+
+ conn *sql.Conn // != nil if advisory lock is acquired
+}
+
+// Lock acquires the advisory lock the first time it is
+// called. Subsequent calls confirm that the lock is still active
+// (i.e., the session is still alive), and re-acquires if needed.
+func (dbl *dbLocker) Lock(ctx context.Context) {
+ logger := ctxlog.FromContext(ctx)
+ for ; ; time.Sleep(5 * time.Second) {
+ if dbl.conn == nil {
+ db, err := dbl.GetDB(ctx)
+ if err != nil {
+ logger.WithError(err).Infof("error getting database pool")
+ continue
+ }
+ conn, err := db.Conn(ctx)
+ if err != nil {
+ logger.WithError(err).Info("error getting database connection")
+ continue
+ }
+ _, err = conn.ExecContext(ctx, `SELECT pg_advisory_lock($1)`, dbl.LockKey)
+ if err != nil {
+ logger.WithError(err).Info("error getting lock")
+ conn.Close()
+ continue
+ }
+ dbl.conn = conn
+ }
+ err := dbl.conn.PingContext(ctx)
+ if err != nil {
+ logger.WithError(err).Info("database connection ping failed")
+ dbl.conn.Close()
+ dbl.conn = nil
+ continue
+ }
+ return
+ }
+}
+
+func (dbl *dbLocker) Unlock() {
+ if dbl.conn != nil {
+ dbl.conn.Close()
+ dbl.conn = nil
+ }
+}
+
+func (h *Handler) trashSweepWorker() {
+ sleep := h.Cluster.Collections.TrashSweepInterval.Duration()
+ logger := ctxlog.FromContext(h.BackgroundContext).WithField("worker", "trash sweep")
+ ctx := ctxlog.Context(h.BackgroundContext, logger)
+ if sleep <= 0 {
+ logger.Debugf("Collections.TrashSweepInterval is %v, not running worker", sleep)
+ return
+ }
+ locker := &dbLocker{GetDB: h.db, LockKey: lockKeyTrashSweep}
+ locker.Lock(ctx)
+ defer locker.Unlock()
+ for time.Sleep(sleep); ctx.Err() == nil; time.Sleep(sleep) {
+ locker.Lock(ctx)
+ ctx := auth.NewContext(ctx, &auth.Credentials{Tokens: []string{h.Cluster.SystemRootToken}})
+ _, err := h.federation.SysTrashSweep(ctx, struct{}{})
+ if err != nil {
+ logger.WithError(err).Info("trash sweep failed")
+ }
+ }
+}
diff --git a/sdk/go/arvados/api.go b/sdk/go/arvados/api.go
index 0fdc13d19..d4af0e7a8 100644
--- a/sdk/go/arvados/api.go
+++ b/sdk/go/arvados/api.go
@@ -68,6 +68,7 @@ var (
EndpointLinkGet = APIEndpoint{"GET", "arvados/v1/links/{uuid}", ""}
EndpointLinkList = APIEndpoint{"GET", "arvados/v1/links", ""}
EndpointLinkDelete = APIEndpoint{"DELETE", "arvados/v1/links/{uuid}", ""}
+ EndpointSysTrashSweep = APIEndpoint{"POST", "sys/trash_sweep", ""}
EndpointUserActivate = APIEndpoint{"POST", "arvados/v1/users/{uuid}/activate", ""}
EndpointUserCreate = APIEndpoint{"POST", "arvados/v1/users", "user"}
EndpointUserCurrent = APIEndpoint{"GET", "arvados/v1/users/current", ""}
@@ -269,6 +270,7 @@ type API interface {
SpecimenGet(ctx context.Context, options GetOptions) (Specimen, error)
SpecimenList(ctx context.Context, options ListOptions) (SpecimenList, error)
SpecimenDelete(ctx context.Context, options DeleteOptions) (Specimen, error)
+ SysTrashSweep(ctx context.Context, options struct{}) (struct{}, error)
UserCreate(ctx context.Context, options CreateOptions) (User, error)
UserUpdate(ctx context.Context, options UpdateOptions) (User, error)
UserMerge(ctx context.Context, options UserMergeOptions) (User, error)
diff --git a/sdk/go/arvados/client.go b/sdk/go/arvados/client.go
index 13bb3bf80..5ec828667 100644
--- a/sdk/go/arvados/client.go
+++ b/sdk/go/arvados/client.go
@@ -217,6 +217,8 @@ func (c *Client) DoAndDecode(dst interface{}, req *http.Request) error {
return err
}
switch {
+ case resp.StatusCode == http.StatusNoContent:
+ return nil
case resp.StatusCode == http.StatusOK && dst == nil:
return nil
case resp.StatusCode == http.StatusOK:
diff --git a/services/api/app/controllers/sys_controller.rb b/services/api/app/controllers/sys_controller.rb
index ecc02e83d..08a672cc0 100644
--- a/services/api/app/controllers/sys_controller.rb
+++ b/services/api/app/controllers/sys_controller.rb
@@ -7,7 +7,7 @@ class SysController < ApplicationController
skip_before_action :render_404_if_no_object
before_action :admin_required
- def sweep_trash
+ def trash_sweep
act_as_system_user do
# Sweep trashed collections
Collection.
@@ -31,6 +31,7 @@ class SysController < ApplicationController
# Sweep expired tokens
ActiveRecord::Base.connection.execute("DELETE from api_client_authorizations where expires_at <= statement_timestamp()")
end
+ head :no_content
end
protected
diff --git a/services/api/config/routes.rb b/services/api/config/routes.rb
index a0c3cafe1..98f5788d6 100644
--- a/services/api/config/routes.rb
+++ b/services/api/config/routes.rb
@@ -92,7 +92,7 @@ Rails.application.routes.draw do
end
end
- post '/sys/sweep_trash', to: 'sys#sweep_trash'
+ post '/sys/trash_sweep', to: 'sys#trash_sweep'
if Rails.env == 'test'
post '/database/reset', to: 'database#reset'
commit 685db28b50225cde7dbb03aa2275f7a165d888a3
Author: Tom Clegg <tom at curii.com>
Date: Fri Nov 12 17:00:57 2021 -0500
18339: Move trash-sweep from background thread to controller action.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/services/api/lib/sweep_trashed_objects.rb b/services/api/app/controllers/sys_controller.rb
similarity index 64%
rename from services/api/lib/sweep_trashed_objects.rb
rename to services/api/app/controllers/sys_controller.rb
index c09896567..ecc02e83d 100644
--- a/services/api/lib/sweep_trashed_objects.rb
+++ b/services/api/app/controllers/sys_controller.rb
@@ -2,33 +2,12 @@
#
# SPDX-License-Identifier: AGPL-3.0
-require 'current_api_client'
+class SysController < ApplicationController
+ skip_before_action :find_object_by_uuid
+ skip_before_action :render_404_if_no_object
+ before_action :admin_required
-module SweepTrashedObjects
- extend CurrentApiClient
-
- def self.delete_project_and_contents(p_uuid)
- p = Group.find_by_uuid(p_uuid)
- if !p || p.group_class != 'project'
- raise "can't sweep group '#{p_uuid}', it may not exist or not be a project"
- end
- # First delete sub projects
- Group.where({group_class: 'project', owner_uuid: p_uuid}).each do |sub_project|
- delete_project_and_contents(sub_project.uuid)
- end
- # Next, iterate over all tables which have owner_uuid fields, with some
- # exceptions, and delete records owned by this project
- skipped_classes = ['Group', 'User']
- ActiveRecord::Base.descendants.reject(&:abstract_class?).each do |klass|
- if !skipped_classes.include?(klass.name) && klass.columns.collect(&:name).include?('owner_uuid')
- klass.where({owner_uuid: p_uuid}).destroy_all
- end
- end
- # Finally delete the project itself
- p.destroy
- end
-
- def self.sweep_now
+ def sweep_trash
act_as_system_user do
# Sweep trashed collections
Collection.
@@ -54,29 +33,26 @@ module SweepTrashedObjects
end
end
- def self.sweep_if_stale
- return if Rails.configuration.Collections.TrashSweepInterval <= 0
- exp = Rails.configuration.Collections.TrashSweepInterval.seconds
- need = false
- Rails.cache.fetch('SweepTrashedObjects', expires_in: exp) do
- need = true
+ protected
+
+ def delete_project_and_contents(p_uuid)
+ p = Group.find_by_uuid(p_uuid)
+ if !p || p.group_class != 'project'
+ raise "can't sweep group '#{p_uuid}', it may not exist or not be a project"
+ end
+ # First delete sub projects
+ Group.where({group_class: 'project', owner_uuid: p_uuid}).each do |sub_project|
+ delete_project_and_contents(sub_project.uuid)
end
- if need
- Thread.new do
- Thread.current.abort_on_exception = false
- begin
- sweep_now
- rescue => e
- Rails.logger.error "#{e.class}: #{e}\n#{e.backtrace.join("\n\t")}"
- ensure
- # Rails 5.1+ makes test threads share a database connection, so we can't
- # close a connection shared with other threads.
- # https://github.com/rails/rails/commit/deba47799ff905f778e0c98a015789a1327d5087
- if Rails.env != "test"
- ActiveRecord::Base.connection.close
- end
- end
+ # Next, iterate over all tables which have owner_uuid fields, with some
+ # exceptions, and delete records owned by this project
+ skipped_classes = ['Group', 'User']
+ ActiveRecord::Base.descendants.reject(&:abstract_class?).each do |klass|
+ if !skipped_classes.include?(klass.name) && klass.columns.collect(&:name).include?('owner_uuid')
+ klass.where({owner_uuid: p_uuid}).destroy_all
end
end
+ # Finally delete the project itself
+ p.destroy
end
end
diff --git a/services/api/app/models/collection.rb b/services/api/app/models/collection.rb
index a98cde444..b4660dbd3 100644
--- a/services/api/app/models/collection.rb
+++ b/services/api/app/models/collection.rb
@@ -3,7 +3,6 @@
# SPDX-License-Identifier: AGPL-3.0
require 'arvados/keep'
-require 'sweep_trashed_objects'
require 'trashable'
class Collection < ArvadosModel
@@ -616,11 +615,6 @@ class Collection < ArvadosModel
super - ["manifest_text", "storage_classes_desired", "storage_classes_confirmed", "current_version_uuid"]
end
- def self.where *args
- SweepTrashedObjects.sweep_if_stale
- super
- end
-
protected
# Although the defaults for these columns is already set up on the schema,
diff --git a/services/api/config/routes.rb b/services/api/config/routes.rb
index 738426b1d..a0c3cafe1 100644
--- a/services/api/config/routes.rb
+++ b/services/api/config/routes.rb
@@ -92,6 +92,8 @@ Rails.application.routes.draw do
end
end
+ post '/sys/sweep_trash', to: 'sys#sweep_trash'
+
if Rails.env == 'test'
post '/database/reset', to: 'database#reset'
end
diff --git a/services/api/test/unit/api_client_authorization_test.rb b/services/api/test/unit/api_client_authorization_test.rb
index fb90418b8..e043f8914 100644
--- a/services/api/test/unit/api_client_authorization_test.rb
+++ b/services/api/test/unit/api_client_authorization_test.rb
@@ -3,7 +3,6 @@
# SPDX-License-Identifier: AGPL-3.0
require 'test_helper'
-require 'sweep_trashed_objects'
class ApiClientAuthorizationTest < ActiveSupport::TestCase
include CurrentApiClient
@@ -20,12 +19,6 @@ class ApiClientAuthorizationTest < ActiveSupport::TestCase
end
end
- test "delete expired in SweepTrashedObjects" do
- assert_not_empty ApiClientAuthorization.where(uuid: api_client_authorizations(:expired).uuid)
- SweepTrashedObjects.sweep_now
- assert_empty ApiClientAuthorization.where(uuid: api_client_authorizations(:expired).uuid)
- end
-
test "accepts SystemRootToken" do
assert_nil ApiClientAuthorization.validate(token: "xxxSystemRootTokenxxx")
diff --git a/services/api/test/unit/collection_test.rb b/services/api/test/unit/collection_test.rb
index de0f1d360..e7134a5be 100644
--- a/services/api/test/unit/collection_test.rb
+++ b/services/api/test/unit/collection_test.rb
@@ -3,7 +3,6 @@
# SPDX-License-Identifier: AGPL-3.0
require 'test_helper'
-require 'sweep_trashed_objects'
require 'fix_collection_versions_timestamps'
class CollectionTest < ActiveSupport::TestCase
@@ -1058,60 +1057,6 @@ class CollectionTest < ActiveSupport::TestCase
assert_includes(coll_uuids, collections(:docker_image).uuid)
end
- test "move collections to trash in SweepTrashedObjects" do
- c = collections(:trashed_on_next_sweep)
- refute_empty Collection.where('uuid=? and is_trashed=false', c.uuid)
- assert_raises(ActiveRecord::RecordNotUnique) do
- act_as_user users(:active) do
- Collection.create!(owner_uuid: c.owner_uuid,
- name: c.name)
- end
- end
- SweepTrashedObjects.sweep_now
- c = Collection.where('uuid=? and is_trashed=true', c.uuid).first
- assert c
- act_as_user users(:active) do
- assert Collection.create!(owner_uuid: c.owner_uuid,
- name: c.name)
- end
- end
-
- test "delete collections in SweepTrashedObjects" do
- uuid = 'zzzzz-4zz18-3u1p5umicfpqszp' # deleted_on_next_sweep
- assert_not_empty Collection.where(uuid: uuid)
- SweepTrashedObjects.sweep_now
- assert_empty Collection.where(uuid: uuid)
- end
-
- test "delete referring links in SweepTrashedObjects" do
- uuid = collections(:trashed_on_next_sweep).uuid
- act_as_system_user do
- assert_raises ActiveRecord::RecordInvalid do
- # Cannot create because :trashed_on_next_sweep is already trashed
- Link.create!(head_uuid: uuid,
- tail_uuid: system_user_uuid,
- link_class: 'whatever',
- name: 'something')
- end
-
- # Bump trash_at to now + 1 minute
- Collection.where(uuid: uuid).
- update(trash_at: db_current_time + (1).minute)
-
- # Not considered trashed now
- Link.create!(head_uuid: uuid,
- tail_uuid: system_user_uuid,
- link_class: 'whatever',
- name: 'something')
- end
- past = db_current_time
- Collection.where(uuid: uuid).
- update_all(is_trashed: true, trash_at: past, delete_at: past)
- assert_not_empty Collection.where(uuid: uuid)
- SweepTrashedObjects.sweep_now
- assert_empty Collection.where(uuid: uuid)
- end
-
test "empty names are exempt from name uniqueness" do
act_as_user users(:active) do
c1 = Collection.new(name: nil, manifest_text: '', owner_uuid: groups(:aproject).uuid)
diff --git a/services/api/test/unit/group_test.rb b/services/api/test/unit/group_test.rb
index 017916f48..10932e116 100644
--- a/services/api/test/unit/group_test.rb
+++ b/services/api/test/unit/group_test.rb
@@ -228,50 +228,6 @@ class GroupTest < ActiveSupport::TestCase
assert User.readable_by(users(:admin)).where(uuid: u_bar.uuid).any?
end
- test "move projects to trash in SweepTrashedObjects" do
- p = groups(:trashed_on_next_sweep)
- assert_empty Group.where('uuid=? and is_trashed=true', p.uuid)
- SweepTrashedObjects.sweep_now
- assert_not_empty Group.where('uuid=? and is_trashed=true', p.uuid)
- end
-
- test "delete projects and their contents in SweepTrashedObjects" do
- g_foo = groups(:trashed_project)
- g_bar = groups(:trashed_subproject)
- g_baz = groups(:trashed_subproject3)
- col = collections(:collection_in_trashed_subproject)
- job = jobs(:job_in_trashed_project)
- cr = container_requests(:cr_in_trashed_project)
- # Save how many objects were before the sweep
- user_nr_was = User.all.length
- coll_nr_was = Collection.all.length
- group_nr_was = Group.where('group_class<>?', 'project').length
- project_nr_was = Group.where(group_class: 'project').length
- cr_nr_was = ContainerRequest.all.length
- job_nr_was = Job.all.length
- assert_not_empty Group.where(uuid: g_foo.uuid)
- assert_not_empty Group.where(uuid: g_bar.uuid)
- assert_not_empty Group.where(uuid: g_baz.uuid)
- assert_not_empty Collection.where(uuid: col.uuid)
- assert_not_empty Job.where(uuid: job.uuid)
- assert_not_empty ContainerRequest.where(uuid: cr.uuid)
- SweepTrashedObjects.sweep_now
- assert_empty Group.where(uuid: g_foo.uuid)
- assert_empty Group.where(uuid: g_bar.uuid)
- assert_empty Group.where(uuid: g_baz.uuid)
- assert_empty Collection.where(uuid: col.uuid)
- assert_empty Job.where(uuid: job.uuid)
- assert_empty ContainerRequest.where(uuid: cr.uuid)
- # No unwanted deletions should have happened
- assert_equal user_nr_was, User.all.length
- assert_equal coll_nr_was-2, # collection_in_trashed_subproject
- Collection.all.length # & deleted_on_next_sweep collections
- assert_equal group_nr_was, Group.where('group_class<>?', 'project').length
- assert_equal project_nr_was-3, Group.where(group_class: 'project').length
- assert_equal cr_nr_was-1, ContainerRequest.all.length
- assert_equal job_nr_was-1, Job.all.length
- end
-
test "project names must be displayable in a filesystem" do
set_user_from_auth :active
["", "{SOLIDUS}"].each do |subst|
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list