[arvados] updated: 2.6.0-340-gd7aa8eaa8

git repository hosting git at public.arvados.org
Fri Jun 30 14:35:31 UTC 2023


Summary of changes:
 services/keep-web/cache.go | 156 ++++++++++++++++++++++++++++-----------------
 1 file changed, 99 insertions(+), 57 deletions(-)

       via  d7aa8eaa8c07fd7bcf219468afa733e0c27ffd98 (commit)
      from  8772ee6a857973d1cac026b2185cd1e3d56a292d (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit d7aa8eaa8c07fd7bcf219468afa733e0c27ffd98
Author: Tom Clegg <tom at curii.com>
Date:   Fri Jun 30 10:28:38 2023 -0400

    20559: Trim session count explicitly instead of using LRU cache.
    
    The LRU cache automatically implemented the session count limit by
    dropping the oldest session, which would break our "one session per
    token" rule (by evicting a session and creating a new one while the
    old session was still in use) when there were more active sessions
    than the configured limit.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/services/keep-web/cache.go b/services/keep-web/cache.go
index b519cbe84..604efd29d 100644
--- a/services/keep-web/cache.go
+++ b/services/keep-web/cache.go
@@ -7,13 +7,13 @@ package keepweb
 import (
 	"errors"
 	"net/http"
+	"sort"
 	"sync"
 	"time"
 
 	"git.arvados.org/arvados.git/sdk/go/arvados"
 	"git.arvados.org/arvados.git/sdk/go/arvadosclient"
 	"git.arvados.org/arvados.git/sdk/go/keepclient"
-	lru "github.com/hashicorp/golang-lru"
 	"github.com/prometheus/client_golang/prometheus"
 	"github.com/sirupsen/logrus"
 )
@@ -25,7 +25,7 @@ type cache struct {
 	logger    logrus.FieldLogger
 	registry  *prometheus.Registry
 	metrics   cacheMetrics
-	sessions  *lru.TwoQueueCache
+	sessions  map[string]*cachedSession
 	setupOnce sync.Once
 	mtx       sync.Mutex
 
@@ -108,9 +108,9 @@ type cachedSession struct {
 	// to completely remove the session entry from the cache.
 	mtx sync.RWMutex
 	// refresh must be locked in order to read or write the
-	// fs/user/userLoaded fields. This mutex enables GetSession
-	// and pruneSessions to remove/replace fs and user values
-	// safely.
+	// fs/user/userLoaded/lastuse fields. This mutex enables
+	// GetSession and pruneSessions to remove/replace fs and user
+	// values safely.
 	refresh sync.Mutex
 	// inuse must be RLocked while the session is in use by a
 	// caller. This mutex enables pruneSessions() to wait for all
@@ -120,6 +120,7 @@ type cachedSession struct {
 	fs         arvados.CustomFileSystem
 	user       arvados.User
 	userLoaded bool
+	lastuse    time.Time
 }
 
 func (sess *cachedSession) Release() {
@@ -133,7 +134,7 @@ func (sess *cachedSession) Release() {
 
 func (c *cache) setup() {
 	var err error
-	c.sessions, err = lru.New2Q(c.cluster.Collections.WebDAVCache.MaxSessions)
+	c.sessions = map[string]*cachedSession{}
 	if err != nil {
 		panic(err)
 	}
@@ -157,8 +158,9 @@ func (c *cache) setup() {
 }
 
 func (c *cache) updateGauges() {
-	c.metrics.collectionBytes.Set(float64(c.collectionBytes()))
-	c.metrics.sessionEntries.Set(float64(c.sessions.Len()))
+	n, size := c.sessionsSize()
+	c.metrics.collectionBytes.Set(float64(size))
+	c.metrics.sessionEntries.Set(float64(n))
 }
 
 var selectPDH = map[string]interface{}{
@@ -169,8 +171,7 @@ func (c *cache) checkout(token string) (*cachedSession, error) {
 	c.setupOnce.Do(c.setup)
 	c.mtx.Lock()
 	defer c.mtx.Unlock()
-	ent, _ := c.sessions.Get(token)
-	sess, _ := ent.(*cachedSession)
+	sess := c.sessions[token]
 	if sess == nil {
 		client, err := arvados.NewClientFromConfig(c.cluster)
 		if err != nil {
@@ -193,7 +194,7 @@ func (c *cache) checkout(token string) (*cachedSession, error) {
 			arvadosclient: arvadosclient,
 			keepclient:    keepclient.New(arvadosclient),
 		}
-		c.sessions.Add(token, sess)
+		c.sessions[token] = sess
 	}
 	sess.mtx.RLock()
 	return sess, nil
@@ -212,6 +213,7 @@ func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSessi
 	sess.refresh.Lock()
 	defer sess.refresh.Unlock()
 	now := time.Now()
+	sess.lastuse = now
 	refresh := sess.expire.Before(now)
 	if sess.fs == nil || !sess.userLoaded || refresh {
 		// Wait for all active users to finish (otherwise they
@@ -248,52 +250,95 @@ func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSessi
 	return sess.fs, sess, &sess.user, nil
 }
 
+type sessionSnapshot struct {
+	token   string
+	sess    *cachedSession
+	lastuse time.Time
+	fs      arvados.CustomFileSystem
+	size    int64
+	prune   bool
+}
+
 // Remove all expired idle session cache entries, and remove in-memory
-// filesystems until approximate remaining size <= maxsize/2
+// filesystems until approximate remaining size <= maxsize
 func (c *cache) pruneSessions() {
 	now := time.Now()
-	keys := c.sessions.Keys()
-	sizes := make([]int64, len(keys))
-	prune := []string(nil)
+	c.mtx.Lock()
+	snaps := make([]sessionSnapshot, 0, len(c.sessions))
+	for token, sess := range c.sessions {
+		snaps = append(snaps, sessionSnapshot{
+			token: token,
+			sess:  sess,
+		})
+	}
+	c.mtx.Unlock()
+
+	// Load lastuse/fs/expire data from sessions. Note we do this
+	// after unlocking c.mtx because sess.refresh.Lock sometimes
+	// waits for another goroutine to finish "[re]fetch user
+	// record".
+	for i := range snaps {
+		snaps[i].sess.refresh.Lock()
+		snaps[i].lastuse = snaps[i].sess.lastuse
+		snaps[i].fs = snaps[i].sess.fs
+		snaps[i].prune = snaps[i].sess.expire.Before(now)
+		snaps[i].sess.refresh.Unlock()
+	}
+
+	// Sort sessions with oldest first.
+	sort.Slice(snaps, func(i, j int) bool {
+		return snaps[i].lastuse.Before(snaps[j].lastuse)
+	})
+
+	// Add up size of sessions that aren't already marked for
+	// pruning based on expire time.
 	var size int64
-	for i, token := range keys {
-		token := token.(string)
-		ent, ok := c.sessions.Peek(token)
-		if !ok {
-			continue
+	for i, snap := range snaps {
+		if !snap.prune && snap.fs != nil {
+			size := snap.fs.MemorySize()
+			snaps[i].size = size
+			size += size
 		}
-		sess := ent.(*cachedSession)
-		sess.refresh.Lock()
-		expired := sess.expire.Before(now)
-		fs := sess.fs
-		sess.refresh.Unlock()
-		if expired {
-			prune = append(prune, token)
+	}
+	// Mark more sessions for deletion until reaching desired
+	// memory size limit, starting with the oldest entries.
+	for i, snap := range snaps {
+		if size <= c.cluster.Collections.WebDAVCache.MaxCollectionBytes {
+			break
 		}
-		if fs != nil {
-			sizes[i] = fs.MemorySize()
-			size += sizes[i]
+		if snap.prune {
+			continue
+		}
+		snaps[i].prune = true
+		size -= snap.size
+	}
+
+	// Mark more sessions for deletion until reaching desired
+	// session count limit.
+	mustprune := len(snaps) - c.cluster.Collections.WebDAVCache.MaxSessions
+	for i := range snaps {
+		if snaps[i].prune {
+			mustprune--
 		}
 	}
-	// Remove tokens until reaching size limit, starting with the
-	// least frequently used entries (which Keys() returns last).
-	for i := len(keys) - 1; i >= 0 && size > c.cluster.Collections.WebDAVCache.MaxCollectionBytes; i-- {
-		if sizes[i] > 0 {
-			prune = append(prune, keys[i].(string))
-			size -= sizes[i]
+	for i := range snaps {
+		if mustprune < 1 {
+			break
+		} else if !snaps[i].prune {
+			snaps[i].prune = true
+			mustprune--
 		}
 	}
 
 	c.mtx.Lock()
 	defer c.mtx.Unlock()
-	for _, token := range prune {
-		ent, ok := c.sessions.Peek(token)
-		if !ok {
+	for _, snap := range snaps {
+		if !snap.prune {
 			continue
 		}
-		sess := ent.(*cachedSession)
+		sess := snap.sess
 		if sess.mtx.TryLock() {
-			c.sessions.Remove(token)
+			delete(c.sessions, snap.token)
 			continue
 		}
 		// We can't remove a session that's been checked out
@@ -317,31 +362,28 @@ func (c *cache) pruneSessions() {
 			defer sess.inuse.Unlock()
 			// Release memory
 			sess.fs = nil
-			if sess.expire.Before(now) {
-				// Mark user data as stale
-				sess.userLoaded = false
-			}
 			// Next GetSession will make a new fs
 		}()
 	}
 }
 
-// collectionBytes returns the approximate combined memory size of the
-// collection cache and session filesystem cache.
-func (c *cache) collectionBytes() uint64 {
-	var size uint64
-	for _, token := range c.sessions.Keys() {
-		ent, ok := c.sessions.Peek(token)
-		if !ok {
-			continue
-		}
-		sess := ent.(*cachedSession)
+// sessionsSize returns the number and approximate total memory size
+// of all cached sessions.
+func (c *cache) sessionsSize() (n int, size int64) {
+	c.mtx.Lock()
+	n = len(c.sessions)
+	sessions := make([]*cachedSession, 0, n)
+	for _, sess := range c.sessions {
+		sessions = append(sessions, sess)
+	}
+	c.mtx.Unlock()
+	for _, sess := range sessions {
 		sess.refresh.Lock()
 		fs := sess.fs
 		sess.refresh.Unlock()
 		if fs != nil {
-			size += uint64(fs.MemorySize())
+			size += fs.MemorySize()
 		}
 	}
-	return size
+	return
 }

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list