[arvados] updated: 2.6.0-340-gd7aa8eaa8
git repository hosting
git at public.arvados.org
Fri Jun 30 14:35:31 UTC 2023
Summary of changes:
services/keep-web/cache.go | 156 ++++++++++++++++++++++++++++-----------------
1 file changed, 99 insertions(+), 57 deletions(-)
via d7aa8eaa8c07fd7bcf219468afa733e0c27ffd98 (commit)
from 8772ee6a857973d1cac026b2185cd1e3d56a292d (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit d7aa8eaa8c07fd7bcf219468afa733e0c27ffd98
Author: Tom Clegg <tom at curii.com>
Date: Fri Jun 30 10:28:38 2023 -0400
20559: Trim session count explicitly instead of using LRU cache.
The LRU cache automatically implemented the session count limit by
dropping the oldest session, which would break our "one session per
token" rule (by evicting a session and creating a new one while the
old session was still in use) when there were more active sessions
than the configured limit.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/services/keep-web/cache.go b/services/keep-web/cache.go
index b519cbe84..604efd29d 100644
--- a/services/keep-web/cache.go
+++ b/services/keep-web/cache.go
@@ -7,13 +7,13 @@ package keepweb
import (
"errors"
"net/http"
+ "sort"
"sync"
"time"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
"git.arvados.org/arvados.git/sdk/go/keepclient"
- lru "github.com/hashicorp/golang-lru"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)
@@ -25,7 +25,7 @@ type cache struct {
logger logrus.FieldLogger
registry *prometheus.Registry
metrics cacheMetrics
- sessions *lru.TwoQueueCache
+ sessions map[string]*cachedSession
setupOnce sync.Once
mtx sync.Mutex
@@ -108,9 +108,9 @@ type cachedSession struct {
// to completely remove the session entry from the cache.
mtx sync.RWMutex
// refresh must be locked in order to read or write the
- // fs/user/userLoaded fields. This mutex enables GetSession
- // and pruneSessions to remove/replace fs and user values
- // safely.
+ // fs/user/userLoaded/lastuse fields. This mutex enables
+ // GetSession and pruneSessions to remove/replace fs and user
+ // values safely.
refresh sync.Mutex
// inuse must be RLocked while the session is in use by a
// caller. This mutex enables pruneSessions() to wait for all
@@ -120,6 +120,7 @@ type cachedSession struct {
fs arvados.CustomFileSystem
user arvados.User
userLoaded bool
+ lastuse time.Time
}
func (sess *cachedSession) Release() {
@@ -133,7 +134,7 @@ func (sess *cachedSession) Release() {
func (c *cache) setup() {
var err error
- c.sessions, err = lru.New2Q(c.cluster.Collections.WebDAVCache.MaxSessions)
+ c.sessions = map[string]*cachedSession{}
if err != nil {
panic(err)
}
@@ -157,8 +158,9 @@ func (c *cache) setup() {
}
func (c *cache) updateGauges() {
- c.metrics.collectionBytes.Set(float64(c.collectionBytes()))
- c.metrics.sessionEntries.Set(float64(c.sessions.Len()))
+ n, size := c.sessionsSize()
+ c.metrics.collectionBytes.Set(float64(size))
+ c.metrics.sessionEntries.Set(float64(n))
}
var selectPDH = map[string]interface{}{
@@ -169,8 +171,7 @@ func (c *cache) checkout(token string) (*cachedSession, error) {
c.setupOnce.Do(c.setup)
c.mtx.Lock()
defer c.mtx.Unlock()
- ent, _ := c.sessions.Get(token)
- sess, _ := ent.(*cachedSession)
+ sess := c.sessions[token]
if sess == nil {
client, err := arvados.NewClientFromConfig(c.cluster)
if err != nil {
@@ -193,7 +194,7 @@ func (c *cache) checkout(token string) (*cachedSession, error) {
arvadosclient: arvadosclient,
keepclient: keepclient.New(arvadosclient),
}
- c.sessions.Add(token, sess)
+ c.sessions[token] = sess
}
sess.mtx.RLock()
return sess, nil
@@ -212,6 +213,7 @@ func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSessi
sess.refresh.Lock()
defer sess.refresh.Unlock()
now := time.Now()
+ sess.lastuse = now
refresh := sess.expire.Before(now)
if sess.fs == nil || !sess.userLoaded || refresh {
// Wait for all active users to finish (otherwise they
@@ -248,52 +250,95 @@ func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSessi
return sess.fs, sess, &sess.user, nil
}
+type sessionSnapshot struct {
+ token string
+ sess *cachedSession
+ lastuse time.Time
+ fs arvados.CustomFileSystem
+ size int64
+ prune bool
+}
+
// Remove all expired idle session cache entries, and remove in-memory
-// filesystems until approximate remaining size <= maxsize/2
+// filesystems until approximate remaining size <= maxsize
func (c *cache) pruneSessions() {
now := time.Now()
- keys := c.sessions.Keys()
- sizes := make([]int64, len(keys))
- prune := []string(nil)
+ c.mtx.Lock()
+ snaps := make([]sessionSnapshot, 0, len(c.sessions))
+ for token, sess := range c.sessions {
+ snaps = append(snaps, sessionSnapshot{
+ token: token,
+ sess: sess,
+ })
+ }
+ c.mtx.Unlock()
+
+ // Load lastuse/fs/expire data from sessions. Note we do this
+ // after unlocking c.mtx because sess.refresh.Lock sometimes
+ // waits for another goroutine to finish "[re]fetch user
+ // record".
+ for i := range snaps {
+ snaps[i].sess.refresh.Lock()
+ snaps[i].lastuse = snaps[i].sess.lastuse
+ snaps[i].fs = snaps[i].sess.fs
+ snaps[i].prune = snaps[i].sess.expire.Before(now)
+ snaps[i].sess.refresh.Unlock()
+ }
+
+ // Sort sessions with oldest first.
+ sort.Slice(snaps, func(i, j int) bool {
+ return snaps[i].lastuse.Before(snaps[j].lastuse)
+ })
+
+ // Add up size of sessions that aren't already marked for
+ // pruning based on expire time.
var size int64
- for i, token := range keys {
- token := token.(string)
- ent, ok := c.sessions.Peek(token)
- if !ok {
- continue
+ for i, snap := range snaps {
+ if !snap.prune && snap.fs != nil {
+ size := snap.fs.MemorySize()
+ snaps[i].size = size
+ size += size
}
- sess := ent.(*cachedSession)
- sess.refresh.Lock()
- expired := sess.expire.Before(now)
- fs := sess.fs
- sess.refresh.Unlock()
- if expired {
- prune = append(prune, token)
+ }
+ // Mark more sessions for deletion until reaching desired
+ // memory size limit, starting with the oldest entries.
+ for i, snap := range snaps {
+ if size <= c.cluster.Collections.WebDAVCache.MaxCollectionBytes {
+ break
}
- if fs != nil {
- sizes[i] = fs.MemorySize()
- size += sizes[i]
+ if snap.prune {
+ continue
+ }
+ snaps[i].prune = true
+ size -= snap.size
+ }
+
+ // Mark more sessions for deletion until reaching desired
+ // session count limit.
+ mustprune := len(snaps) - c.cluster.Collections.WebDAVCache.MaxSessions
+ for i := range snaps {
+ if snaps[i].prune {
+ mustprune--
}
}
- // Remove tokens until reaching size limit, starting with the
- // least frequently used entries (which Keys() returns last).
- for i := len(keys) - 1; i >= 0 && size > c.cluster.Collections.WebDAVCache.MaxCollectionBytes; i-- {
- if sizes[i] > 0 {
- prune = append(prune, keys[i].(string))
- size -= sizes[i]
+ for i := range snaps {
+ if mustprune < 1 {
+ break
+ } else if !snaps[i].prune {
+ snaps[i].prune = true
+ mustprune--
}
}
c.mtx.Lock()
defer c.mtx.Unlock()
- for _, token := range prune {
- ent, ok := c.sessions.Peek(token)
- if !ok {
+ for _, snap := range snaps {
+ if !snap.prune {
continue
}
- sess := ent.(*cachedSession)
+ sess := snap.sess
if sess.mtx.TryLock() {
- c.sessions.Remove(token)
+ delete(c.sessions, snap.token)
continue
}
// We can't remove a session that's been checked out
@@ -317,31 +362,28 @@ func (c *cache) pruneSessions() {
defer sess.inuse.Unlock()
// Release memory
sess.fs = nil
- if sess.expire.Before(now) {
- // Mark user data as stale
- sess.userLoaded = false
- }
// Next GetSession will make a new fs
}()
}
}
-// collectionBytes returns the approximate combined memory size of the
-// collection cache and session filesystem cache.
-func (c *cache) collectionBytes() uint64 {
- var size uint64
- for _, token := range c.sessions.Keys() {
- ent, ok := c.sessions.Peek(token)
- if !ok {
- continue
- }
- sess := ent.(*cachedSession)
+// sessionsSize returns the number and approximate total memory size
+// of all cached sessions.
+func (c *cache) sessionsSize() (n int, size int64) {
+ c.mtx.Lock()
+ n = len(c.sessions)
+ sessions := make([]*cachedSession, 0, n)
+ for _, sess := range c.sessions {
+ sessions = append(sessions, sess)
+ }
+ c.mtx.Unlock()
+ for _, sess := range sessions {
sess.refresh.Lock()
fs := sess.fs
sess.refresh.Unlock()
if fs != nil {
- size += uint64(fs.MemorySize())
+ size += fs.MemorySize()
}
}
- return size
+ return
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list