[ARVADOS] created: 2.1.0-449-g2fe25e2b3
Git user
git at public.arvados.org
Tue Mar 2 21:28:44 UTC 2021
at 2fe25e2b32042098106acead136fd3064bab30e3 (commit)
commit 2fe25e2b32042098106acead136fd3064bab30e3
Author: Tom Clegg <tom at curii.com>
Date: Tue Mar 2 16:27:18 2021 -0500
16745: Reject unsupported APIs instead of mishandling.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/services/keep-web/s3.go b/services/keep-web/s3.go
index 9479b5886..620a21b88 100644
--- a/services/keep-web/s3.go
+++ b/services/keep-web/s3.go
@@ -102,6 +102,7 @@ func s3stringToSign(alg, scope, signedHeaders string, r *http.Request) (string,
normalizedURL := *r.URL
normalizedURL.RawPath = ""
normalizedURL.Path = reMultipleSlashChars.ReplaceAllString(normalizedURL.Path, "/")
+ ctxlog.FromContext(r.Context()).Infof("escapedPath %s", normalizedURL.EscapedPath())
canonicalRequest := fmt.Sprintf("%s\n%s\n%s\n%s\n%s\n%s", r.Method, normalizedURL.EscapedPath(), s3querystring(r.URL), canonicalHeaders, signedHeaders, r.Header.Get("X-Amz-Content-Sha256"))
ctxlog.FromContext(r.Context()).Debugf("s3stringToSign: canonicalRequest %s", canonicalRequest)
return fmt.Sprintf("%s\n%s\n%s\n%s", alg, r.Header.Get("X-Amz-Date"), scope, hashdigest(sha256.New(), canonicalRequest)), nil
@@ -221,6 +222,8 @@ var UnauthorizedAccess = "UnauthorizedAccess"
var InvalidRequest = "InvalidRequest"
var SignatureDoesNotMatch = "SignatureDoesNotMatch"
+var reRawQueryIndicatesAPI = regexp.MustCompile(`^[a-z]+(&|$)`)
+
// serveS3 handles r and returns true if r is a request from an S3
// client, otherwise it returns false.
func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
@@ -292,13 +295,23 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
// GetBucketLocation
w.Header().Set("Content-Type", "application/xml")
io.WriteString(w, xml.Header)
- fmt.Fprintln(w, `<LocationConstraint xmlns="http://s3.amazonaws.com/doc/2006-03-01/">`+h.Config.cluster.ClusterID+`</LocationConstraint>`)
+ fmt.Fprintln(w, `<LocationConstraint><LocationConstraint xmlns="http://s3.amazonaws.com/doc/2006-03-01/">`+
+ h.Config.cluster.ClusterID+
+ `</LocationConstraint></LocationConstraint>`)
+ } else if reRawQueryIndicatesAPI.MatchString(r.URL.RawQuery) {
+ // GetBucketWebsite ("GET /bucketid/?website"), GetBucketTagging, etc.
+ s3ErrorResponse(w, InvalidRequest, "API not supported", r.URL.Path+"?"+r.URL.RawQuery, http.StatusBadRequest)
} else {
// ListObjects
h.s3list(bucketName, w, r, fs)
}
return true
case r.Method == http.MethodGet || r.Method == http.MethodHead:
+ if reRawQueryIndicatesAPI.MatchString(r.URL.RawQuery) {
+ // GetObjectRetention ("GET /bucketid/objectid?retention&versionID=..."), etc.
+ s3ErrorResponse(w, InvalidRequest, "API not supported", r.URL.Path+"?"+r.URL.RawQuery, http.StatusBadRequest)
+ return true
+ }
fi, err := fs.Stat(fspath)
if r.Method == "HEAD" && !objectNameGiven {
// HeadBucket
@@ -328,6 +341,11 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
http.FileServer(fs).ServeHTTP(w, &r)
return true
case r.Method == http.MethodPut:
+ if reRawQueryIndicatesAPI.MatchString(r.URL.RawQuery) {
+ // PutObjectAcl ("PUT /bucketid/objectid?acl&versionID=..."), etc.
+ s3ErrorResponse(w, InvalidRequest, "API not supported", r.URL.Path+"?"+r.URL.RawQuery, http.StatusBadRequest)
+ return true
+ }
if !objectNameGiven {
s3ErrorResponse(w, InvalidArgument, "Missing object name in PUT request.", r.URL.Path, http.StatusBadRequest)
return true
@@ -424,6 +442,11 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
w.WriteHeader(http.StatusOK)
return true
case r.Method == http.MethodDelete:
+ if reRawQueryIndicatesAPI.MatchString(r.URL.RawQuery) {
+ // DeleteObjectTagging ("DELETE /bucketid/objectid?tagging&versionID=..."), etc.
+ s3ErrorResponse(w, InvalidRequest, "API not supported", r.URL.Path+"?"+r.URL.RawQuery, http.StatusBadRequest)
+ return true
+ }
if !objectNameGiven || r.URL.Path == "/" {
s3ErrorResponse(w, InvalidArgument, "missing object name in DELETE request", r.URL.Path, http.StatusBadRequest)
return true
diff --git a/services/keep-web/s3_test.go b/services/keep-web/s3_test.go
index 4b92d4dad..e60b55c93 100644
--- a/services/keep-web/s3_test.go
+++ b/services/keep-web/s3_test.go
@@ -76,7 +76,7 @@ func (s *IntegrationSuite) s3setup(c *check.C) s3stage {
auth := aws.NewAuth(arvadostest.ActiveTokenUUID, arvadostest.ActiveToken, "", time.Now().Add(time.Hour))
region := aws.Region{
- Name: s.testServer.Addr,
+ Name: "zzzzz",
S3Endpoint: "http://" + s.testServer.Addr,
}
client := s3.New(*auth, region)
@@ -455,7 +455,7 @@ func (stage *s3stage) writeBigDirs(c *check.C, dirs int, filesPerDir int) {
}
func (s *IntegrationSuite) sign(c *check.C, req *http.Request, key, secret string) {
- scope := "20200202/region/service/aws4_request"
+ scope := "20200202/zzzzz/service/aws4_request"
signedHeaders := "date"
req.Header.Set("Date", time.Now().UTC().Format(time.RFC1123))
stringToSign, err := s3stringToSign(s3SignAlgorithm, scope, signedHeaders, req)
@@ -560,7 +560,7 @@ func (s *IntegrationSuite) TestS3NormalizeURIForSignature(c *check.C) {
{"/foo%5bbar", "/foo%5Bbar"}, // %XX must be uppercase
} {
date := time.Now().UTC().Format("20060102T150405Z")
- scope := "20200202/fakeregion/S3/aws4_request"
+ scope := "20200202/zzzzz/S3/aws4_request"
canonicalRequest := fmt.Sprintf("%s\n%s\n%s\n%s\n%s\n%s", "GET", trial.normalizedPath, "", "host:host.example.com\n", "host", "")
c.Logf("canonicalRequest %q", canonicalRequest)
expect := fmt.Sprintf("%s\n%s\n%s\n%s", s3SignAlgorithm, date, scope, hashdigest(sha256.New(), canonicalRequest))
@@ -579,6 +579,23 @@ func (s *IntegrationSuite) TestS3NormalizeURIForSignature(c *check.C) {
}
}
+func (s *IntegrationSuite) TestS3GetBucketLocation(c *check.C) {
+ stage := s.s3setup(c)
+ defer stage.teardown(c)
+ for _, bucket := range []*s3.Bucket{stage.collbucket, stage.projbucket} {
+ req, err := http.NewRequest("GET", bucket.URL("/"), nil)
+ c.Check(err, check.IsNil)
+ req.Header.Set("Authorization", "AWS "+arvadostest.ActiveTokenV2+":none")
+ req.URL.RawQuery = "location"
+ resp, err := http.DefaultClient.Do(req)
+ c.Assert(err, check.IsNil)
+ c.Check(resp.Header.Get("Content-Type"), check.Equals, "application/xml")
+ buf, err := ioutil.ReadAll(resp.Body)
+ c.Assert(err, check.IsNil)
+ c.Check(string(buf), check.Equals, "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n<LocationConstraint><LocationConstraint xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">zzzzz</LocationConstraint></LocationConstraint>\n")
+ }
+}
+
func (s *IntegrationSuite) TestS3GetBucketVersioning(c *check.C) {
stage := s.s3setup(c)
defer stage.teardown(c)
@@ -596,6 +613,37 @@ func (s *IntegrationSuite) TestS3GetBucketVersioning(c *check.C) {
}
}
+func (s *IntegrationSuite) TestS3UnsupportedAPIs(c *check.C) {
+ stage := s.s3setup(c)
+ defer stage.teardown(c)
+ for _, trial := range []struct {
+ method string
+ path string
+ rawquery string
+ }{
+ {"GET", "/", "acl&versionId=1234"}, // GetBucketAcl
+ {"GET", "/foo", "acl&versionId=1234"}, // GetObjectAcl
+ {"PUT", "/", "acl"}, // PutBucketAcl
+ {"PUT", "/foo", "acl"}, // PutObjectAcl
+ {"DELETE", "/", "tagging"}, // DeleteBucketTagging
+ {"DELETE", "/foo", "tagging"}, // DeleteObjectTagging
+ } {
+ for _, bucket := range []*s3.Bucket{stage.collbucket, stage.projbucket} {
+ c.Logf("trial %v bucket %v", trial, bucket)
+ req, err := http.NewRequest(trial.method, bucket.URL(trial.path), nil)
+ c.Check(err, check.IsNil)
+ req.Header.Set("Authorization", "AWS "+arvadostest.ActiveTokenV2+":none")
+ req.URL.RawQuery = trial.rawquery
+ resp, err := http.DefaultClient.Do(req)
+ c.Assert(err, check.IsNil)
+ c.Check(resp.Header.Get("Content-Type"), check.Equals, "application/xml")
+ buf, err := ioutil.ReadAll(resp.Body)
+ c.Assert(err, check.IsNil)
+ c.Check(string(buf), check.Matches, "(?ms).*InvalidRequest.*API not supported.*")
+ }
+ }
+}
+
// If there are no CommonPrefixes entries, the CommonPrefixes XML tag
// should not appear at all.
func (s *IntegrationSuite) TestS3ListNoCommonPrefixes(c *check.C) {
commit b6131783bf7f0ca8035b1461688af09c292b8e7f
Author: Tom Clegg <tom at curii.com>
Date: Tue Mar 2 11:12:11 2021 -0500
16745: Handle GetBucketLocation API.
Previously misinterpreted as ListObjects with no delimiter, which is
very slow.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/services/keep-web/s3.go b/services/keep-web/s3.go
index d500f1e65..9479b5886 100644
--- a/services/keep-web/s3.go
+++ b/services/keep-web/s3.go
@@ -288,6 +288,11 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
w.Header().Set("Content-Type", "application/xml")
io.WriteString(w, xml.Header)
fmt.Fprintln(w, `<VersioningConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/"/>`)
+ } else if _, ok = r.URL.Query()["location"]; ok {
+ // GetBucketLocation
+ w.Header().Set("Content-Type", "application/xml")
+ io.WriteString(w, xml.Header)
+ fmt.Fprintln(w, `<LocationConstraint xmlns="http://s3.amazonaws.com/doc/2006-03-01/">`+h.Config.cluster.ClusterID+`</LocationConstraint>`)
} else {
// ListObjects
h.s3list(bucketName, w, r, fs)
commit 7d3775f7dcd87bb5c210e33ff099460074080749
Author: Tom Clegg <tom at curii.com>
Date: Tue Mar 2 09:55:16 2021 -0500
16745: Don't store nil in an atomic.Value (panic).
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/services/keep-web/cache.go b/services/keep-web/cache.go
index af143c77e..ec48be609 100644
--- a/services/keep-web/cache.go
+++ b/services/keep-web/cache.go
@@ -218,6 +218,7 @@ func (c *cache) GetSession(token string) (arvados.CustomFileSystem, error) {
now := time.Now()
ent, _ := c.sessions.Get(token)
sess, _ := ent.(*cachedSession)
+ expired := false
if sess == nil {
c.metrics.sessionMisses.Inc()
sess = &cachedSession{
@@ -226,13 +227,13 @@ func (c *cache) GetSession(token string) (arvados.CustomFileSystem, error) {
c.sessions.Add(token, sess)
} else if sess.expire.Before(now) {
c.metrics.sessionMisses.Inc()
- sess.fs.Store(nil)
+ expired = true
} else {
c.metrics.sessionHits.Inc()
}
go c.pruneSessions()
fs, _ := sess.fs.Load().(arvados.CustomFileSystem)
- if fs != nil {
+ if fs != nil && !expired {
return fs, nil
}
ac, err := arvados.NewClientFromConfig(c.cluster)
commit 8c2bbecb1c09fbc3818dc1a2d73b3fda2ba68e02
Author: Tom Clegg <tom at curii.com>
Date: Mon Feb 22 21:46:27 2021 -0500
16745: Prune enough sessions to reach size limit, not all.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/services/keep-web/cache.go b/services/keep-web/cache.go
index 71e995330..af143c77e 100644
--- a/services/keep-web/cache.go
+++ b/services/keep-web/cache.go
@@ -83,7 +83,7 @@ func (m *cacheMetrics) setup(reg *prometheus.Registry) {
Namespace: "arvados",
Subsystem: "keepweb_collectioncache",
Name: "cached_manifest_bytes",
- Help: "Total size of all manifests in cache.",
+ Help: "Total size of all cached manifests and sessions.",
})
reg.MustRegister(m.collectionBytes)
m.collectionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
@@ -251,10 +251,13 @@ func (c *cache) GetSession(token string) (arvados.CustomFileSystem, error) {
return fs, nil
}
+// Remove all expired session cache entries, then remove more entries
+// until approximate remaining size <= maxsize/2
func (c *cache) pruneSessions() {
now := time.Now()
var size int64
- for _, token := range c.sessions.Keys() {
+ keys := c.sessions.Keys()
+ for _, token := range keys {
ent, ok := c.sessions.Peek(token)
if !ok {
continue
@@ -264,16 +267,28 @@ func (c *cache) pruneSessions() {
c.sessions.Remove(token)
continue
}
+ if fs, ok := s.fs.Load().(arvados.CustomFileSystem); ok {
+ size += fs.MemorySize()
+ }
+ }
+ // Remove tokens until reaching size limit, starting with the
+ // least frequently used entries (which Keys() returns last).
+ for i := len(keys) - 1; i >= 0; i-- {
+ token := keys[i]
+ if size <= c.cluster.Collections.WebDAVCache.MaxCollectionBytes/2 {
+ break
+ }
+ ent, ok := c.sessions.Peek(token)
+ if !ok {
+ continue
+ }
+ s := ent.(*cachedSession)
fs, _ := s.fs.Load().(arvados.CustomFileSystem)
if fs == nil {
continue
}
- size += fs.MemorySize()
- }
- if size > c.cluster.Collections.WebDAVCache.MaxCollectionBytes/2 {
- for _, token := range c.sessions.Keys() {
- c.sessions.Remove(token)
- }
+ c.sessions.Remove(token)
+ size -= fs.MemorySize()
}
}
commit 820fc945c069d237e515dcc1608a5661dbf7700e
Author: Tom Clegg <tom at curii.com>
Date: Mon Feb 22 11:10:10 2021 -0500
16745: Keep a SiteFileSystem alive for multiple read requests.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml
index 68e518732..bc87b2cc7 100644
--- a/lib/config/config.default.yml
+++ b/lib/config/config.default.yml
@@ -523,21 +523,30 @@ Clusters:
TrustAllContent: false
# Cache parameters for WebDAV content serving:
- # * TTL: Maximum time to cache manifests and permission checks.
- # * UUIDTTL: Maximum time to cache collection state.
- # * MaxBlockEntries: Maximum number of block cache entries.
- # * MaxCollectionEntries: Maximum number of collection cache entries.
- # * MaxCollectionBytes: Approximate memory limit for collection cache.
- # * MaxPermissionEntries: Maximum number of permission cache entries.
- # * MaxUUIDEntries: Maximum number of UUID cache entries.
WebDAVCache:
+ # Time to cache manifests, permission checks, and sessions.
TTL: 300s
+
+ # Time to cache collection state.
UUIDTTL: 5s
- MaxBlockEntries: 4
+
+ # Block cache entries. Each block consumes up to 64 MiB RAM.
+ MaxBlockEntries: 4
+
+ # Collection cache entries.
MaxCollectionEntries: 1000
- MaxCollectionBytes: 100000000
+
+ # Approximate memory limit (in bytes) for collection cache.
+ MaxCollectionBytes: 100000000
+
+ # Permission cache entries.
MaxPermissionEntries: 1000
- MaxUUIDEntries: 1000
+
+ # UUID cache entries.
+ MaxUUIDEntries: 1000
+
+ # Persistent sessions.
+ MaxSessions: 100
Login:
# One of the following mechanisms (SSO, Google, PAM, LDAP, or
diff --git a/lib/config/generated_config.go b/lib/config/generated_config.go
index 8ef787771..f0ae11aab 100644
--- a/lib/config/generated_config.go
+++ b/lib/config/generated_config.go
@@ -529,21 +529,30 @@ Clusters:
TrustAllContent: false
# Cache parameters for WebDAV content serving:
- # * TTL: Maximum time to cache manifests and permission checks.
- # * UUIDTTL: Maximum time to cache collection state.
- # * MaxBlockEntries: Maximum number of block cache entries.
- # * MaxCollectionEntries: Maximum number of collection cache entries.
- # * MaxCollectionBytes: Approximate memory limit for collection cache.
- # * MaxPermissionEntries: Maximum number of permission cache entries.
- # * MaxUUIDEntries: Maximum number of UUID cache entries.
WebDAVCache:
+ # Time to cache manifests, permission checks, and sessions.
TTL: 300s
+
+ # Time to cache collection state.
UUIDTTL: 5s
- MaxBlockEntries: 4
+
+ # Block cache entries. Each block consumes up to 64 MiB RAM.
+ MaxBlockEntries: 4
+
+ # Collection cache entries.
MaxCollectionEntries: 1000
- MaxCollectionBytes: 100000000
+
+ # Approximate memory limit (in bytes) for collection cache.
+ MaxCollectionBytes: 100000000
+
+ # Permission cache entries.
MaxPermissionEntries: 1000
- MaxUUIDEntries: 1000
+
+ # UUID cache entries.
+ MaxUUIDEntries: 1000
+
+ # Persistent sessions.
+ MaxSessions: 100
Login:
# One of the following mechanisms (SSO, Google, PAM, LDAP, or
diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go
index 4a56c9302..074914744 100644
--- a/sdk/go/arvados/config.go
+++ b/sdk/go/arvados/config.go
@@ -65,6 +65,7 @@ type WebDAVCacheConfig struct {
MaxCollectionBytes int64
MaxPermissionEntries int
MaxUUIDEntries int
+ MaxSessions int
}
type Cluster struct {
diff --git a/sdk/go/arvados/fs_base.go b/sdk/go/arvados/fs_base.go
index aa75fee7c..2478641df 100644
--- a/sdk/go/arvados/fs_base.go
+++ b/sdk/go/arvados/fs_base.go
@@ -106,6 +106,9 @@ type FileSystem interface {
// path is "", flush all dirs/streams; otherwise, flush only
// the specified dir/stream.
Flush(path string, shortBlocks bool) error
+
+ // Estimate current memory usage.
+ MemorySize() int64
}
type inode interface {
@@ -156,6 +159,7 @@ type inode interface {
sync.Locker
RLock()
RUnlock()
+ MemorySize() int64
}
type fileinfo struct {
@@ -229,6 +233,13 @@ func (*nullnode) Child(name string, replace func(inode) (inode, error)) (inode,
return nil, ErrNotADirectory
}
+func (*nullnode) MemorySize() int64 {
+ // Types that embed nullnode should report their own size, but
+ // if they don't, we at least report a non-zero size to ensure
+ // a large tree doesn't get reported as 0 bytes.
+ return 64
+}
+
type treenode struct {
fs FileSystem
parent inode
@@ -319,6 +330,15 @@ func (n *treenode) Sync() error {
return nil
}
+func (n *treenode) MemorySize() (size int64) {
+ n.RLock()
+ defer n.RUnlock()
+ for _, inode := range n.inodes {
+ size += inode.MemorySize()
+ }
+ return
+}
+
type fileSystem struct {
root inode
fsBackend
@@ -607,6 +627,10 @@ func (fs *fileSystem) Flush(string, bool) error {
return ErrInvalidOperation
}
+func (fs *fileSystem) MemorySize() int64 {
+ return fs.root.MemorySize()
+}
+
// rlookup (recursive lookup) returns the inode for the file/directory
// with the given name (which may contain "/" separators). If no such
// file/directory exists, the returned node is nil.
diff --git a/sdk/go/arvados/fs_collection.go b/sdk/go/arvados/fs_collection.go
index 1de558a1b..0233826a7 100644
--- a/sdk/go/arvados/fs_collection.go
+++ b/sdk/go/arvados/fs_collection.go
@@ -38,9 +38,6 @@ type CollectionFileSystem interface {
// Total data bytes in all files.
Size() int64
-
- // Memory consumed by buffered file data.
- memorySize() int64
}
type collectionFileSystem struct {
@@ -232,10 +229,10 @@ func (fs *collectionFileSystem) Flush(path string, shortBlocks bool) error {
return dn.flush(context.TODO(), names, flushOpts{sync: false, shortBlocks: shortBlocks})
}
-func (fs *collectionFileSystem) memorySize() int64 {
+func (fs *collectionFileSystem) MemorySize() int64 {
fs.fileSystem.root.Lock()
defer fs.fileSystem.root.Unlock()
- return fs.fileSystem.root.(*dirnode).memorySize()
+ return fs.fileSystem.root.(*dirnode).MemorySize()
}
func (fs *collectionFileSystem) MarshalManifest(prefix string) (string, error) {
@@ -879,14 +876,14 @@ func (dn *dirnode) flush(ctx context.Context, names []string, opts flushOpts) er
}
// caller must have write lock.
-func (dn *dirnode) memorySize() (size int64) {
+func (dn *dirnode) MemorySize() (size int64) {
for _, name := range dn.sortedNames() {
node := dn.inodes[name]
node.Lock()
defer node.Unlock()
switch node := node.(type) {
case *dirnode:
- size += node.memorySize()
+ size += node.MemorySize()
case *filenode:
for _, seg := range node.segments {
switch seg := seg.(type) {
diff --git a/sdk/go/arvados/fs_collection_test.go b/sdk/go/arvados/fs_collection_test.go
index 59a6a6ba8..05c8ea61a 100644
--- a/sdk/go/arvados/fs_collection_test.go
+++ b/sdk/go/arvados/fs_collection_test.go
@@ -1153,9 +1153,9 @@ func (s *CollectionFSSuite) TestFlushAll(c *check.C) {
fs.Flush("", true)
}
- size := fs.memorySize()
+ size := fs.MemorySize()
if !c.Check(size <= 1<<24, check.Equals, true) {
- c.Logf("at dir%d fs.memorySize()=%d", i, size)
+ c.Logf("at dir%d fs.MemorySize()=%d", i, size)
return
}
}
@@ -1188,13 +1188,13 @@ func (s *CollectionFSSuite) TestFlushFullBlocksOnly(c *check.C) {
c.Assert(err, check.IsNil)
}
}
- c.Check(fs.memorySize(), check.Equals, int64(nDirs*67<<20))
+ c.Check(fs.MemorySize(), check.Equals, int64(nDirs*67<<20))
c.Check(flushed, check.Equals, int64(0))
waitForFlush := func(expectUnflushed, expectFlushed int64) {
- for deadline := time.Now().Add(5 * time.Second); fs.memorySize() > expectUnflushed && time.Now().Before(deadline); time.Sleep(10 * time.Millisecond) {
+ for deadline := time.Now().Add(5 * time.Second); fs.MemorySize() > expectUnflushed && time.Now().Before(deadline); time.Sleep(10 * time.Millisecond) {
}
- c.Check(fs.memorySize(), check.Equals, expectUnflushed)
+ c.Check(fs.MemorySize(), check.Equals, expectUnflushed)
c.Check(flushed, check.Equals, expectFlushed)
}
diff --git a/sdk/go/arvados/fs_deferred.go b/sdk/go/arvados/fs_deferred.go
index 254b90c81..bb6c7a262 100644
--- a/sdk/go/arvados/fs_deferred.go
+++ b/sdk/go/arvados/fs_deferred.go
@@ -112,3 +112,4 @@ func (dn *deferrednode) RLock() { dn.realinode().RLock(
func (dn *deferrednode) RUnlock() { dn.realinode().RUnlock() }
func (dn *deferrednode) FS() FileSystem { return dn.currentinode().FS() }
func (dn *deferrednode) Parent() inode { return dn.currentinode().Parent() }
+func (dn *deferrednode) MemorySize() int64 { return dn.currentinode().MemorySize() }
diff --git a/services/keep-web/cache.go b/services/keep-web/cache.go
index eeb78ad90..71e995330 100644
--- a/services/keep-web/cache.go
+++ b/services/keep-web/cache.go
@@ -6,23 +6,27 @@ package main
import (
"sync"
+ "sync/atomic"
"time"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
- "github.com/hashicorp/golang-lru"
+ "git.arvados.org/arvados.git/sdk/go/keepclient"
+ lru "github.com/hashicorp/golang-lru"
"github.com/prometheus/client_golang/prometheus"
)
const metricsUpdateInterval = time.Second / 10
type cache struct {
- config *arvados.WebDAVCacheConfig
+ cluster *arvados.Cluster
+ config *arvados.WebDAVCacheConfig // TODO: use cluster.Collections.WebDAV instead
registry *prometheus.Registry
metrics cacheMetrics
pdhs *lru.TwoQueueCache
collections *lru.TwoQueueCache
permissions *lru.TwoQueueCache
+ sessions *lru.TwoQueueCache
setupOnce sync.Once
}
@@ -30,9 +34,12 @@ type cacheMetrics struct {
requests prometheus.Counter
collectionBytes prometheus.Gauge
collectionEntries prometheus.Gauge
+ sessionEntries prometheus.Gauge
collectionHits prometheus.Counter
pdhHits prometheus.Counter
permissionHits prometheus.Counter
+ sessionHits prometheus.Counter
+ sessionMisses prometheus.Counter
apiCalls prometheus.Counter
}
@@ -86,6 +93,27 @@ func (m *cacheMetrics) setup(reg *prometheus.Registry) {
Help: "Number of manifests in cache.",
})
reg.MustRegister(m.collectionEntries)
+ m.sessionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "keepweb_sessions",
+ Name: "active",
+ Help: "Number of active token sessions.",
+ })
+ reg.MustRegister(m.sessionEntries)
+ m.sessionHits = prometheus.NewCounter(prometheus.CounterOpts{
+ Namespace: "arvados",
+ Subsystem: "keepweb_sessions",
+ Name: "hits",
+ Help: "Number of token session cache hits.",
+ })
+ reg.MustRegister(m.sessionHits)
+ m.sessionMisses = prometheus.NewCounter(prometheus.CounterOpts{
+ Namespace: "arvados",
+ Subsystem: "keepweb_sessions",
+ Name: "misses",
+ Help: "Number of token session cache misses.",
+ })
+ reg.MustRegister(m.sessionMisses)
}
type cachedPDH struct {
@@ -102,6 +130,11 @@ type cachedPermission struct {
expire time.Time
}
+type cachedSession struct {
+ expire time.Time
+ fs atomic.Value
+}
+
func (c *cache) setup() {
var err error
c.pdhs, err = lru.New2Q(c.config.MaxUUIDEntries)
@@ -116,6 +149,10 @@ func (c *cache) setup() {
if err != nil {
panic(err)
}
+ c.sessions, err = lru.New2Q(c.config.MaxSessions)
+ if err != nil {
+ panic(err)
+ }
reg := c.registry
if reg == nil {
@@ -132,6 +169,7 @@ func (c *cache) setup() {
func (c *cache) updateGauges() {
c.metrics.collectionBytes.Set(float64(c.collectionBytes()))
c.metrics.collectionEntries.Set(float64(c.collections.Len()))
+ c.metrics.sessionEntries.Set(float64(c.sessions.Len()))
}
var selectPDH = map[string]interface{}{
@@ -165,6 +203,80 @@ func (c *cache) Update(client *arvados.Client, coll arvados.Collection, fs arvad
return err
}
+// ResetSession unloads any potentially stale state. Should be called
+// after write operations, so subsequent reads don't return stale
+// data.
+func (c *cache) ResetSession(token string) {
+ c.setupOnce.Do(c.setup)
+ c.sessions.Remove(token)
+}
+
+// Get a long-lived CustomFileSystem suitable for doing a read operation
+// with the given token.
+func (c *cache) GetSession(token string) (arvados.CustomFileSystem, error) {
+ c.setupOnce.Do(c.setup)
+ now := time.Now()
+ ent, _ := c.sessions.Get(token)
+ sess, _ := ent.(*cachedSession)
+ if sess == nil {
+ c.metrics.sessionMisses.Inc()
+ sess = &cachedSession{
+ expire: now.Add(c.config.TTL.Duration()),
+ }
+ c.sessions.Add(token, sess)
+ } else if sess.expire.Before(now) {
+ c.metrics.sessionMisses.Inc()
+ sess.fs.Store(nil)
+ } else {
+ c.metrics.sessionHits.Inc()
+ }
+ go c.pruneSessions()
+ fs, _ := sess.fs.Load().(arvados.CustomFileSystem)
+ if fs != nil {
+ return fs, nil
+ }
+ ac, err := arvados.NewClientFromConfig(c.cluster)
+ if err != nil {
+ return nil, err
+ }
+ ac.AuthToken = token
+ arv, err := arvadosclient.New(ac)
+ if err != nil {
+ return nil, err
+ }
+ kc := keepclient.New(arv)
+ fs = ac.SiteFileSystem(kc)
+ fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution)
+ sess.fs.Store(fs)
+ return fs, nil
+}
+
+func (c *cache) pruneSessions() {
+ now := time.Now()
+ var size int64
+ for _, token := range c.sessions.Keys() {
+ ent, ok := c.sessions.Peek(token)
+ if !ok {
+ continue
+ }
+ s := ent.(*cachedSession)
+ if s.expire.Before(now) {
+ c.sessions.Remove(token)
+ continue
+ }
+ fs, _ := s.fs.Load().(arvados.CustomFileSystem)
+ if fs == nil {
+ continue
+ }
+ size += fs.MemorySize()
+ }
+ if size > c.cluster.Collections.WebDAVCache.MaxCollectionBytes/2 {
+ for _, token := range c.sessions.Keys() {
+ c.sessions.Remove(token)
+ }
+ }
+}
+
func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) {
c.setupOnce.Do(c.setup)
c.metrics.requests.Inc()
@@ -288,7 +400,7 @@ func (c *cache) pruneCollections() {
}
}
for i, k := range keys {
- if size <= c.config.MaxCollectionBytes {
+ if size <= c.config.MaxCollectionBytes/2 {
break
}
if expired[i] {
@@ -300,8 +412,8 @@ func (c *cache) pruneCollections() {
}
}
-// collectionBytes returns the approximate memory size of the
-// collection cache.
+// collectionBytes returns the approximate combined memory size of the
+// collection cache and session filesystem cache.
func (c *cache) collectionBytes() uint64 {
var size uint64
for _, k := range c.collections.Keys() {
@@ -311,6 +423,15 @@ func (c *cache) collectionBytes() uint64 {
}
size += uint64(len(v.(*cachedCollection).collection.ManifestText))
}
+ for _, token := range c.sessions.Keys() {
+ ent, ok := c.sessions.Peek(token)
+ if !ok {
+ continue
+ }
+ if fs, ok := ent.(*cachedSession).fs.Load().(arvados.CustomFileSystem); ok {
+ size += uint64(fs.MemorySize())
+ }
+ }
return size
}
diff --git a/services/keep-web/handler.go b/services/keep-web/handler.go
index 2d6fb78f8..4ea2fa2f6 100644
--- a/services/keep-web/handler.go
+++ b/services/keep-web/handler.go
@@ -520,7 +520,8 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
func (h *handler) getClients(reqID, token string) (arv *arvadosclient.ArvadosClient, kc *keepclient.KeepClient, client *arvados.Client, release func(), err error) {
arv = h.clientPool.Get()
if arv == nil {
- return nil, nil, nil, nil, err
+ err = h.clientPool.Err()
+ return
}
release = func() { h.clientPool.Put(arv) }
arv.ApiToken = token
@@ -548,14 +549,11 @@ func (h *handler) serveSiteFS(w http.ResponseWriter, r *http.Request, tokens []s
http.Error(w, errReadOnly.Error(), http.StatusMethodNotAllowed)
return
}
- _, kc, client, release, err := h.getClients(r.Header.Get("X-Request-Id"), tokens[0])
+ fs, err := h.Config.Cache.GetSession(tokens[0])
if err != nil {
- http.Error(w, "Pool failed: "+h.clientPool.Err().Error(), http.StatusInternalServerError)
+ http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
- defer release()
-
- fs := client.SiteFileSystem(kc)
fs.ForwardSlashNameSubstitution(h.Config.cluster.Collections.ForwardSlashNameSubstitution)
f, err := fs.Open(r.URL.Path)
if os.IsNotExist(err) {
diff --git a/services/keep-web/main.go b/services/keep-web/main.go
index 647eab165..a62e0abb6 100644
--- a/services/keep-web/main.go
+++ b/services/keep-web/main.go
@@ -38,6 +38,7 @@ func newConfig(arvCfg *arvados.Config) *Config {
}
cfg.cluster = cls
cfg.Cache.config = &cfg.cluster.Collections.WebDAVCache
+ cfg.Cache.cluster = cls
return &cfg
}
diff --git a/services/keep-web/s3.go b/services/keep-web/s3.go
index 1c84976d2..d500f1e65 100644
--- a/services/keep-web/s3.go
+++ b/services/keep-web/s3.go
@@ -243,15 +243,29 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
return false
}
- _, kc, client, release, err := h.getClients(r.Header.Get("X-Request-Id"), token)
- if err != nil {
- s3ErrorResponse(w, InternalError, "Pool failed: "+h.clientPool.Err().Error(), r.URL.Path, http.StatusInternalServerError)
- return true
+ var err error
+ var fs arvados.CustomFileSystem
+ if r.Method == http.MethodGet || r.Method == http.MethodHead {
+ // Use a single session (cached FileSystem) across
+ // multiple read requests.
+ fs, err = h.Config.Cache.GetSession(token)
+ if err != nil {
+ s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError)
+ return true
+ }
+ } else {
+ // Create a FileSystem for this request, to avoid
+ // exposing incomplete write operations to concurrent
+ // requests.
+ _, kc, client, release, err := h.getClients(r.Header.Get("X-Request-Id"), token)
+ if err != nil {
+ s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError)
+ return true
+ }
+ defer release()
+ fs = client.SiteFileSystem(kc)
+ fs.ForwardSlashNameSubstitution(h.Config.cluster.Collections.ForwardSlashNameSubstitution)
}
- defer release()
-
- fs := client.SiteFileSystem(kc)
- fs.ForwardSlashNameSubstitution(h.Config.cluster.Collections.ForwardSlashNameSubstitution)
var objectNameGiven bool
var bucketName string
@@ -400,6 +414,8 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError)
return true
}
+ // Ensure a subsequent read operation will see the changes.
+ h.Config.Cache.ResetSession(token)
w.WriteHeader(http.StatusOK)
return true
case r.Method == http.MethodDelete:
@@ -447,11 +463,12 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError)
return true
}
+ // Ensure a subsequent read operation will see the changes.
+ h.Config.Cache.ResetSession(token)
w.WriteHeader(http.StatusNoContent)
return true
default:
s3ErrorResponse(w, InvalidRequest, "method not allowed", r.URL.Path, http.StatusMethodNotAllowed)
-
return true
}
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list