[arvados] created: 2.1.0-2963-g1f73a01c0
git repository hosting
git at public.arvados.org
Mon Oct 17 18:36:44 UTC 2022
at 1f73a01c0d895811228025fefd430d9c40590175 (commit)
commit 1f73a01c0d895811228025fefd430d9c40590175
Author: Tom Clegg <tom at curii.com>
Date: Mon Oct 17 14:34:32 2022 -0400
18863: Move container log cleanup from rake task to controller.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml
index 9b5547248..559b57c8c 100644
--- a/lib/config/config.default.yml
+++ b/lib/config/config.default.yml
@@ -1071,12 +1071,16 @@ Clusters:
LocalKeepLogsToContainerLog: none
Logging:
- # When you run the db:delete_old_container_logs task, it will find
- # containers that have been finished for at least this many seconds,
+ # Periodically (see SweepInterval) Arvados will check for
+ # containers that have been finished for at least this long,
# and delete their stdout, stderr, arv-mount, crunch-run, and
# crunchstat logs from the logs table.
MaxAge: 720h
+ # How often to delete cached log entries for finished
+ # containers (see MaxAge).
+ SweepInterval: 12h
+
# These two settings control how frequently log events are flushed to the
# database. Log lines are buffered until either crunch_log_bytes_per_event
# has been reached or crunch_log_seconds_between_events has elapsed since
diff --git a/lib/controller/dblock/dblock.go b/lib/controller/dblock/dblock.go
index 1a36822d5..a46201bb4 100644
--- a/lib/controller/dblock/dblock.go
+++ b/lib/controller/dblock/dblock.go
@@ -15,8 +15,9 @@ import (
)
var (
- TrashSweep = &DBLocker{key: 10001}
- retryDelay = 5 * time.Second
+ TrashSweep = &DBLocker{key: 10001}
+ ContainerLogSweep = &DBLocker{key: 10002}
+ retryDelay = 5 * time.Second
)
// DBLocker uses pg_advisory_lock to maintain a cluster-wide lock for
diff --git a/lib/controller/handler.go b/lib/controller/handler.go
index e9c56db4d..e1392bef9 100644
--- a/lib/controller/handler.go
+++ b/lib/controller/handler.go
@@ -155,6 +155,7 @@ func (h *Handler) setup() {
}
go h.trashSweepWorker()
+ go h.containerLogSweepWorker()
}
var errDBConnection = errors.New("database connection error")
diff --git a/lib/controller/handler_test.go b/lib/controller/handler_test.go
index 127e6c34c..0ffe0255f 100644
--- a/lib/controller/handler_test.go
+++ b/lib/controller/handler_test.go
@@ -496,6 +496,35 @@ func (s *HandlerSuite) TestTrashSweep(c *check.C) {
}
}
+func (s *HandlerSuite) TestContainerLogSweep(c *check.C) {
+ s.cluster.SystemRootToken = arvadostest.SystemRootToken
+ s.cluster.Containers.Logging.SweepInterval = arvados.Duration(time.Second / 10)
+ s.handler.CheckHealth()
+ ctx := auth.NewContext(s.ctx, &auth.Credentials{Tokens: []string{arvadostest.ActiveTokenV2}})
+ logentry, err := s.handler.federation.LogCreate(ctx, arvados.CreateOptions{Attrs: map[string]interface{}{
+ "object_uuid": arvadostest.CompletedContainerUUID,
+ "event_type": "stderr",
+ "properties": map[string]interface{}{
+ "text": "test trash sweep\n",
+ },
+ }})
+ c.Assert(err, check.IsNil)
+ defer s.handler.federation.LogDelete(ctx, arvados.DeleteOptions{UUID: logentry.UUID})
+ deadline := time.Now().Add(5 * time.Second)
+ for {
+ if time.Now().After(deadline) {
+ c.Log("timed out")
+ c.FailNow()
+ }
+ logentries, err := s.handler.federation.LogList(ctx, arvados.ListOptions{Filters: []arvados.Filter{{"uuid", "=", logentry.UUID}}, Limit: -1})
+ c.Assert(err, check.IsNil)
+ if len(logentries.Items) == 0 {
+ break
+ }
+ time.Sleep(time.Second / 10)
+ }
+}
+
func (s *HandlerSuite) TestLogActivity(c *check.C) {
s.cluster.SystemRootToken = arvadostest.SystemRootToken
s.cluster.Users.ActivityLoggingPeriod = arvados.Duration(24 * time.Hour)
diff --git a/lib/controller/trash.go b/lib/controller/trash.go
index 551b2f92b..37c7be485 100644
--- a/lib/controller/trash.go
+++ b/lib/controller/trash.go
@@ -5,6 +5,7 @@
package controller
import (
+ "context"
"time"
"git.arvados.org/arvados.git/lib/controller/dblock"
@@ -12,22 +13,56 @@ import (
"git.arvados.org/arvados.git/sdk/go/ctxlog"
)
-func (h *Handler) trashSweepWorker() {
- sleep := h.Cluster.Collections.TrashSweepInterval.Duration()
- logger := ctxlog.FromContext(h.BackgroundContext).WithField("worker", "trash sweep")
+func (h *Handler) periodicWorker(workerName string, interval time.Duration, locker *dblock.DBLocker, run func(context.Context) error) {
+ logger := ctxlog.FromContext(h.BackgroundContext).WithField("worker", workerName)
ctx := ctxlog.Context(h.BackgroundContext, logger)
- if sleep <= 0 {
- logger.Debugf("Collections.TrashSweepInterval is %v, not running worker", sleep)
+ if interval <= 0 {
+ logger.Debugf("interval is %v, not running worker", interval)
return
}
- dblock.TrashSweep.Lock(ctx, h.db)
- defer dblock.TrashSweep.Unlock()
- for time.Sleep(sleep); ctx.Err() == nil; time.Sleep(sleep) {
- dblock.TrashSweep.Check()
- ctx := auth.NewContext(ctx, &auth.Credentials{Tokens: []string{h.Cluster.SystemRootToken}})
- _, err := h.federation.SysTrashSweep(ctx, struct{}{})
+ locker.Lock(ctx, h.db)
+ defer locker.Unlock()
+ for time.Sleep(interval); ctx.Err() == nil; time.Sleep(interval) {
+ locker.Check()
+ err := run(ctx)
if err != nil {
- logger.WithError(err).Info("trash sweep failed")
+ logger.WithError(err).Infof("%s failed", workerName)
}
}
}
+
+func (h *Handler) trashSweepWorker() {
+ h.periodicWorker("trash sweep", h.Cluster.Collections.TrashSweepInterval.Duration(), dblock.TrashSweep, func(ctx context.Context) error {
+ ctx = auth.NewContext(ctx, &auth.Credentials{Tokens: []string{h.Cluster.SystemRootToken}})
+ _, err := h.federation.SysTrashSweep(ctx, struct{}{})
+ return err
+ })
+}
+
+func (h *Handler) containerLogSweepWorker() {
+ h.periodicWorker("container log sweep", h.Cluster.Containers.Logging.SweepInterval.Duration(), dblock.ContainerLogSweep, func(ctx context.Context) error {
+ db, err := h.db(ctx)
+ if err != nil {
+ return err
+ }
+ res, err := db.ExecContext(ctx, `
+DELETE FROM logs
+ USING containers
+ WHERE logs.object_uuid=containers.uuid
+ AND logs.event_type in ('stdout', 'stderr', 'arv-mount', 'crunch-run', 'crunchstat')
+ AND containers.log IS NOT NULL
+ AND now() - containers.finished_at > $1::interval`,
+ h.Cluster.Containers.Logging.MaxAge.String())
+ if err != nil {
+ return err
+ }
+ logger := ctxlog.FromContext(ctx)
+ rows, err := res.RowsAffected()
+ if err != nil {
+ logger.WithError(err).Warn("unexpected error from RowsAffected()")
+ } else {
+ logger.WithField("rows", rows).Info("deleted rows from logs table")
+ }
+ return nil
+ })
+}
diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go
index 3c1ef4826..8705f2869 100644
--- a/sdk/go/arvados/config.go
+++ b/sdk/go/arvados/config.go
@@ -467,6 +467,7 @@ type ContainersConfig struct {
}
Logging struct {
MaxAge Duration
+ SweepInterval Duration
LogBytesPerEvent int
LogSecondsBetweenEvents Duration
LogThrottlePeriod Duration
diff --git a/services/api/lib/tasks/delete_old_container_logs.rake b/services/api/lib/tasks/delete_old_container_logs.rake
index 7a0ab3826..db1b3667c 100644
--- a/services/api/lib/tasks/delete_old_container_logs.rake
+++ b/services/api/lib/tasks/delete_old_container_logs.rake
@@ -8,11 +8,9 @@
# from the logs table.
namespace :db do
- desc "Remove old container log entries from the logs table"
+ desc "deprecated / no-op"
task delete_old_container_logs: :environment do
- delete_sql = "DELETE FROM logs WHERE id in (SELECT logs.id FROM logs JOIN containers ON logs.object_uuid = containers.uuid WHERE event_type IN ('stdout', 'stderr', 'arv-mount', 'crunch-run', 'crunchstat') AND containers.log IS NOT NULL AND now() - containers.finished_at > interval '#{Rails.configuration.Containers.Logging.MaxAge.to_i} seconds')"
-
- ActiveRecord::Base.connection.execute(delete_sql)
+ Rails.logger.info "this db:delete_old_container_logs rake task is no longer used"
end
end
commit 67f1f536ad087ca9976f093e1f3477cec57f4985
Author: Tom Clegg <tom at curii.com>
Date: Mon Oct 17 14:26:16 2022 -0400
18863: Add Log methods to golang API interface.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/controller/federation/conn.go b/lib/controller/federation/conn.go
index 89f68a5ef..03690af02 100644
--- a/lib/controller/federation/conn.go
+++ b/lib/controller/federation/conn.go
@@ -515,6 +515,26 @@ func (conn *Conn) LinkDelete(ctx context.Context, options arvados.DeleteOptions)
return conn.chooseBackend(options.UUID).LinkDelete(ctx, options)
}
+func (conn *Conn) LogCreate(ctx context.Context, options arvados.CreateOptions) (arvados.Log, error) {
+ return conn.chooseBackend(options.ClusterID).LogCreate(ctx, options)
+}
+
+func (conn *Conn) LogUpdate(ctx context.Context, options arvados.UpdateOptions) (arvados.Log, error) {
+ return conn.chooseBackend(options.UUID).LogUpdate(ctx, options)
+}
+
+func (conn *Conn) LogGet(ctx context.Context, options arvados.GetOptions) (arvados.Log, error) {
+ return conn.chooseBackend(options.UUID).LogGet(ctx, options)
+}
+
+func (conn *Conn) LogList(ctx context.Context, options arvados.ListOptions) (arvados.LogList, error) {
+ return conn.generated_LogList(ctx, options)
+}
+
+func (conn *Conn) LogDelete(ctx context.Context, options arvados.DeleteOptions) (arvados.Log, error) {
+ return conn.chooseBackend(options.UUID).LogDelete(ctx, options)
+}
+
func (conn *Conn) SpecimenList(ctx context.Context, options arvados.ListOptions) (arvados.SpecimenList, error) {
return conn.generated_SpecimenList(ctx, options)
}
diff --git a/lib/controller/federation/generate.go b/lib/controller/federation/generate.go
index 8af613156..86bbf9d9e 100644
--- a/lib/controller/federation/generate.go
+++ b/lib/controller/federation/generate.go
@@ -53,7 +53,7 @@ func main() {
defer out.Close()
out.Write(regexp.MustCompile(`(?ms)^.*package .*?import.*?\n\)\n`).Find(buf))
io.WriteString(out, "//\n// -- this file is auto-generated -- do not edit -- edit list.go and run \"go generate\" instead --\n//\n\n")
- for _, t := range []string{"Container", "ContainerRequest", "Group", "Specimen", "User", "Link", "APIClientAuthorization"} {
+ for _, t := range []string{"Container", "ContainerRequest", "Group", "Specimen", "User", "Link", "Log", "APIClientAuthorization"} {
_, err := out.Write(bytes.ReplaceAll(orig, []byte("Collection"), []byte(t)))
if err != nil {
panic(err)
diff --git a/lib/controller/federation/generated.go b/lib/controller/federation/generated.go
index 66f36161d..637a1ce91 100755
--- a/lib/controller/federation/generated.go
+++ b/lib/controller/federation/generated.go
@@ -263,6 +263,47 @@ func (conn *Conn) generated_LinkList(ctx context.Context, options arvados.ListOp
return merged, err
}
+func (conn *Conn) generated_LogList(ctx context.Context, options arvados.ListOptions) (arvados.LogList, error) {
+ var mtx sync.Mutex
+ var merged arvados.LogList
+ var needSort atomic.Value
+ needSort.Store(false)
+ err := conn.splitListRequest(ctx, options, func(ctx context.Context, _ string, backend arvados.API, options arvados.ListOptions) ([]string, error) {
+ options.ForwardedFor = conn.cluster.ClusterID + "-" + options.ForwardedFor
+ cl, err := backend.LogList(ctx, options)
+ if err != nil {
+ return nil, err
+ }
+ mtx.Lock()
+ defer mtx.Unlock()
+ if len(merged.Items) == 0 {
+ merged = cl
+ } else if len(cl.Items) > 0 {
+ merged.Items = append(merged.Items, cl.Items...)
+ needSort.Store(true)
+ }
+ uuids := make([]string, 0, len(cl.Items))
+ for _, item := range cl.Items {
+ uuids = append(uuids, item.UUID)
+ }
+ return uuids, nil
+ })
+ if needSort.Load().(bool) {
+ // Apply the default/implied order, "modified_at desc"
+ sort.Slice(merged.Items, func(i, j int) bool {
+ mi, mj := merged.Items[i].ModifiedAt, merged.Items[j].ModifiedAt
+ return mj.Before(mi)
+ })
+ }
+ if merged.Items == nil {
+ // Return empty results as [], not null
+ // (https://github.com/golang/go/issues/27589 might be
+ // a better solution in the future)
+ merged.Items = []arvados.Log{}
+ }
+ return merged, err
+}
+
func (conn *Conn) generated_APIClientAuthorizationList(ctx context.Context, options arvados.ListOptions) (arvados.APIClientAuthorizationList, error) {
var mtx sync.Mutex
var merged arvados.APIClientAuthorizationList
diff --git a/lib/controller/router/router.go b/lib/controller/router/router.go
index 80d5e9298..d4712558e 100644
--- a/lib/controller/router/router.go
+++ b/lib/controller/router/router.go
@@ -367,6 +367,41 @@ func (rtr *router) addRoutes() {
return rtr.backend.LinkDelete(ctx, *opts.(*arvados.DeleteOptions))
},
},
+ {
+ arvados.EndpointLogCreate,
+ func() interface{} { return &arvados.CreateOptions{} },
+ func(ctx context.Context, opts interface{}) (interface{}, error) {
+ return rtr.backend.LogCreate(ctx, *opts.(*arvados.CreateOptions))
+ },
+ },
+ {
+ arvados.EndpointLogUpdate,
+ func() interface{} { return &arvados.UpdateOptions{} },
+ func(ctx context.Context, opts interface{}) (interface{}, error) {
+ return rtr.backend.LogUpdate(ctx, *opts.(*arvados.UpdateOptions))
+ },
+ },
+ {
+ arvados.EndpointLogList,
+ func() interface{} { return &arvados.ListOptions{Limit: -1} },
+ func(ctx context.Context, opts interface{}) (interface{}, error) {
+ return rtr.backend.LogList(ctx, *opts.(*arvados.ListOptions))
+ },
+ },
+ {
+ arvados.EndpointLogGet,
+ func() interface{} { return &arvados.GetOptions{} },
+ func(ctx context.Context, opts interface{}) (interface{}, error) {
+ return rtr.backend.LogGet(ctx, *opts.(*arvados.GetOptions))
+ },
+ },
+ {
+ arvados.EndpointLogDelete,
+ func() interface{} { return &arvados.DeleteOptions{} },
+ func(ctx context.Context, opts interface{}) (interface{}, error) {
+ return rtr.backend.LogDelete(ctx, *opts.(*arvados.DeleteOptions))
+ },
+ },
{
arvados.EndpointSpecimenCreate,
func() interface{} { return &arvados.CreateOptions{} },
diff --git a/lib/controller/rpc/conn.go b/lib/controller/rpc/conn.go
index 0e532f23c..4d8a82ce4 100644
--- a/lib/controller/rpc/conn.go
+++ b/lib/controller/rpc/conn.go
@@ -559,6 +559,41 @@ func (conn *Conn) LinkDelete(ctx context.Context, options arvados.DeleteOptions)
return resp, err
}
+func (conn *Conn) LogCreate(ctx context.Context, options arvados.CreateOptions) (arvados.Log, error) {
+ ep := arvados.EndpointLogCreate
+ var resp arvados.Log
+ err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
+ return resp, err
+}
+
+func (conn *Conn) LogUpdate(ctx context.Context, options arvados.UpdateOptions) (arvados.Log, error) {
+ ep := arvados.EndpointLogUpdate
+ var resp arvados.Log
+ err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
+ return resp, err
+}
+
+func (conn *Conn) LogGet(ctx context.Context, options arvados.GetOptions) (arvados.Log, error) {
+ ep := arvados.EndpointLogGet
+ var resp arvados.Log
+ err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
+ return resp, err
+}
+
+func (conn *Conn) LogList(ctx context.Context, options arvados.ListOptions) (arvados.LogList, error) {
+ ep := arvados.EndpointLogList
+ var resp arvados.LogList
+ err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
+ return resp, err
+}
+
+func (conn *Conn) LogDelete(ctx context.Context, options arvados.DeleteOptions) (arvados.Log, error) {
+ ep := arvados.EndpointLogDelete
+ var resp arvados.Log
+ err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
+ return resp, err
+}
+
func (conn *Conn) SpecimenCreate(ctx context.Context, options arvados.CreateOptions) (arvados.Specimen, error) {
ep := arvados.EndpointSpecimenCreate
var resp arvados.Specimen
diff --git a/sdk/go/arvados/api.go b/sdk/go/arvados/api.go
index 3797a17f5..bec387e85 100644
--- a/sdk/go/arvados/api.go
+++ b/sdk/go/arvados/api.go
@@ -70,6 +70,11 @@ var (
EndpointLinkGet = APIEndpoint{"GET", "arvados/v1/links/{uuid}", ""}
EndpointLinkList = APIEndpoint{"GET", "arvados/v1/links", ""}
EndpointLinkDelete = APIEndpoint{"DELETE", "arvados/v1/links/{uuid}", ""}
+ EndpointLogCreate = APIEndpoint{"POST", "arvados/v1/logs", "log"}
+ EndpointLogUpdate = APIEndpoint{"PATCH", "arvados/v1/logs/{uuid}", "log"}
+ EndpointLogGet = APIEndpoint{"GET", "arvados/v1/logs/{uuid}", ""}
+ EndpointLogList = APIEndpoint{"GET", "arvados/v1/logs", ""}
+ EndpointLogDelete = APIEndpoint{"DELETE", "arvados/v1/logs/{uuid}", ""}
EndpointSysTrashSweep = APIEndpoint{"POST", "sys/trash_sweep", ""}
EndpointUserActivate = APIEndpoint{"POST", "arvados/v1/users/{uuid}/activate", ""}
EndpointUserCreate = APIEndpoint{"POST", "arvados/v1/users", "user"}
@@ -284,6 +289,11 @@ type API interface {
LinkGet(ctx context.Context, options GetOptions) (Link, error)
LinkList(ctx context.Context, options ListOptions) (LinkList, error)
LinkDelete(ctx context.Context, options DeleteOptions) (Link, error)
+ LogCreate(ctx context.Context, options CreateOptions) (Log, error)
+ LogUpdate(ctx context.Context, options UpdateOptions) (Log, error)
+ LogGet(ctx context.Context, options GetOptions) (Log, error)
+ LogList(ctx context.Context, options ListOptions) (LogList, error)
+ LogDelete(ctx context.Context, options DeleteOptions) (Log, error)
SpecimenCreate(ctx context.Context, options CreateOptions) (Specimen, error)
SpecimenUpdate(ctx context.Context, options UpdateOptions) (Specimen, error)
SpecimenGet(ctx context.Context, options GetOptions) (Specimen, error)
diff --git a/sdk/go/arvados/log.go b/sdk/go/arvados/log.go
index 6f72634e5..06d7987e3 100644
--- a/sdk/go/arvados/log.go
+++ b/sdk/go/arvados/log.go
@@ -12,12 +12,15 @@ import (
type Log struct {
ID uint64 `json:"id"`
UUID string `json:"uuid"`
+ OwnerUUID string `json:"owner_uuid"`
ObjectUUID string `json:"object_uuid"`
ObjectOwnerUUID string `json:"object_owner_uuid"`
EventType string `json:"event_type"`
- EventAt *time.Time `json:"event"`
+ EventAt time.Time `json:"event"`
+ Summary string `json:"summary"`
Properties map[string]interface{} `json:"properties"`
- CreatedAt *time.Time `json:"created_at"`
+ CreatedAt time.Time `json:"created_at"`
+ ModifiedAt time.Time `json:"modified_at"`
}
// LogList is an arvados#logList resource.
diff --git a/sdk/go/arvadostest/api.go b/sdk/go/arvadostest/api.go
index d6da579d6..83efd8892 100644
--- a/sdk/go/arvadostest/api.go
+++ b/sdk/go/arvadostest/api.go
@@ -193,6 +193,26 @@ func (as *APIStub) LinkDelete(ctx context.Context, options arvados.DeleteOptions
as.appendCall(ctx, as.LinkDelete, options)
return arvados.Link{}, as.Error
}
+func (as *APIStub) LogCreate(ctx context.Context, options arvados.CreateOptions) (arvados.Log, error) {
+ as.appendCall(ctx, as.LogCreate, options)
+ return arvados.Log{}, as.Error
+}
+func (as *APIStub) LogUpdate(ctx context.Context, options arvados.UpdateOptions) (arvados.Log, error) {
+ as.appendCall(ctx, as.LogUpdate, options)
+ return arvados.Log{}, as.Error
+}
+func (as *APIStub) LogGet(ctx context.Context, options arvados.GetOptions) (arvados.Log, error) {
+ as.appendCall(ctx, as.LogGet, options)
+ return arvados.Log{}, as.Error
+}
+func (as *APIStub) LogList(ctx context.Context, options arvados.ListOptions) (arvados.LogList, error) {
+ as.appendCall(ctx, as.LogList, options)
+ return arvados.LogList{}, as.Error
+}
+func (as *APIStub) LogDelete(ctx context.Context, options arvados.DeleteOptions) (arvados.Log, error) {
+ as.appendCall(ctx, as.LogDelete, options)
+ return arvados.Log{}, as.Error
+}
func (as *APIStub) SpecimenCreate(ctx context.Context, options arvados.CreateOptions) (arvados.Specimen, error) {
as.appendCall(ctx, as.SpecimenCreate, options)
return arvados.Specimen{}, as.Error
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list