[ARVADOS] updated: 2.1.0-119-gf3451bb69
Git user
git at public.arvados.org
Wed Mar 10 16:05:44 UTC 2021
Summary of changes:
lib/config/config.default.yml | 29 ++++---
lib/config/generated_config.go | 29 ++++---
sdk/go/arvados/config.go | 1 +
sdk/go/arvados/fs_base.go | 24 ++++++
sdk/go/arvados/fs_collection.go | 11 +--
sdk/go/arvados/fs_collection_test.go | 10 +--
sdk/go/arvados/fs_deferred.go | 1 +
services/keep-web/cache.go | 153 +++++++++++++++++++++++++++++++++--
services/keep-web/handler.go | 10 +--
services/keep-web/main.go | 1 +
services/keep-web/s3.go | 63 ++++++++++++---
services/keep-web/s3_test.go | 54 ++++++++++++-
services/keep-web/server_test.go | 2 +-
13 files changed, 329 insertions(+), 59 deletions(-)
via f3451bb6907e8692fe8ef55af0174fc7ed36c8bc (commit)
via 2ae2378cb44c761ed5f3df4c97247d7fba862934 (commit)
via 252e51a36db295cde96ebc48f72ee4aa6b48dd81 (commit)
via a0bf72872492dd3b4a89e5bc359f480cbb4c76fb (commit)
via 559cf52d3c14d03b09e56f0beb337a3d57b01918 (commit)
via 47afa97d0e5220423c9101acb8ca69420e260e50 (commit)
via 0b714d15c15340f3f52c331528d5cbb4cc422f07 (commit)
from 0d3382aa338a2e4f33f5ed98067ff5cacdb6c6b0 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit f3451bb6907e8692fe8ef55af0174fc7ed36c8bc
Author: Tom Clegg <tom at curii.com>
Date: Wed Mar 3 10:05:02 2021 -0500
16745: Update test.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/services/keep-web/server_test.go b/services/keep-web/server_test.go
index 0a1c7d1b3..5c68eb424 100644
--- a/services/keep-web/server_test.go
+++ b/services/keep-web/server_test.go
@@ -395,7 +395,7 @@ func (s *IntegrationSuite) TestMetrics(c *check.C) {
c.Check(counters["arvados_keepweb_collectioncache_permission_hits//"].Value, check.Equals, int64(1))
c.Check(gauges["arvados_keepweb_collectioncache_cached_manifests//"].Value, check.Equals, float64(1))
// FooCollection's cached manifest size is 45 ("1f4b0....+45") plus one 51-byte blob signature
- c.Check(gauges["arvados_keepweb_collectioncache_cached_manifest_bytes//"].Value, check.Equals, float64(45+51))
+ c.Check(gauges["arvados_keepweb_sessions_cached_collection_bytes//"].Value, check.Equals, float64(45+51))
// If the Host header indicates a collection, /metrics.json
// refers to a file in the collection -- the metrics handler
commit 2ae2378cb44c761ed5f3df4c97247d7fba862934
Author: Tom Clegg <tom at curii.com>
Date: Tue Mar 2 22:25:11 2021 -0500
16745: Rename session cache size metric.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/services/keep-web/cache.go b/services/keep-web/cache.go
index ec48be609..07db7a016 100644
--- a/services/keep-web/cache.go
+++ b/services/keep-web/cache.go
@@ -81,8 +81,8 @@ func (m *cacheMetrics) setup(reg *prometheus.Registry) {
reg.MustRegister(m.apiCalls)
m.collectionBytes = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "arvados",
- Subsystem: "keepweb_collectioncache",
- Name: "cached_manifest_bytes",
+ Subsystem: "keepweb_sessions",
+ Name: "cached_collection_bytes",
Help: "Total size of all cached manifests and sessions.",
})
reg.MustRegister(m.collectionBytes)
commit 252e51a36db295cde96ebc48f72ee4aa6b48dd81
Author: Tom Clegg <tom at curii.com>
Date: Tue Mar 2 16:27:18 2021 -0500
16745: Reject unsupported APIs instead of mishandling.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/services/keep-web/s3.go b/services/keep-web/s3.go
index e6ee3e2dc..e08b2b844 100644
--- a/services/keep-web/s3.go
+++ b/services/keep-web/s3.go
@@ -102,6 +102,7 @@ func s3stringToSign(alg, scope, signedHeaders string, r *http.Request) (string,
normalizedURL := *r.URL
normalizedURL.RawPath = ""
normalizedURL.Path = reMultipleSlashChars.ReplaceAllString(normalizedURL.Path, "/")
+ ctxlog.FromContext(r.Context()).Infof("escapedPath %s", normalizedURL.EscapedPath())
canonicalRequest := fmt.Sprintf("%s\n%s\n%s\n%s\n%s\n%s", r.Method, normalizedURL.EscapedPath(), s3querystring(r.URL), canonicalHeaders, signedHeaders, r.Header.Get("X-Amz-Content-Sha256"))
ctxlog.FromContext(r.Context()).Debugf("s3stringToSign: canonicalRequest %s", canonicalRequest)
return fmt.Sprintf("%s\n%s\n%s\n%s", alg, r.Header.Get("X-Amz-Date"), scope, hashdigest(sha256.New(), canonicalRequest)), nil
@@ -206,6 +207,8 @@ var UnauthorizedAccess = "UnauthorizedAccess"
var InvalidRequest = "InvalidRequest"
var SignatureDoesNotMatch = "SignatureDoesNotMatch"
+var reRawQueryIndicatesAPI = regexp.MustCompile(`^[a-z]+(&|$)`)
+
// serveS3 handles r and returns true if r is a request from an S3
// client, otherwise it returns false.
func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
@@ -277,13 +280,23 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
// GetBucketLocation
w.Header().Set("Content-Type", "application/xml")
io.WriteString(w, xml.Header)
- fmt.Fprintln(w, `<LocationConstraint xmlns="http://s3.amazonaws.com/doc/2006-03-01/">`+h.Config.cluster.ClusterID+`</LocationConstraint>`)
+ fmt.Fprintln(w, `<LocationConstraint><LocationConstraint xmlns="http://s3.amazonaws.com/doc/2006-03-01/">`+
+ h.Config.cluster.ClusterID+
+ `</LocationConstraint></LocationConstraint>`)
+ } else if reRawQueryIndicatesAPI.MatchString(r.URL.RawQuery) {
+ // GetBucketWebsite ("GET /bucketid/?website"), GetBucketTagging, etc.
+ s3ErrorResponse(w, InvalidRequest, "API not supported", r.URL.Path+"?"+r.URL.RawQuery, http.StatusBadRequest)
} else {
// ListObjects
h.s3list(bucketName, w, r, fs)
}
return true
case r.Method == http.MethodGet || r.Method == http.MethodHead:
+ if reRawQueryIndicatesAPI.MatchString(r.URL.RawQuery) {
+ // GetObjectRetention ("GET /bucketid/objectid?retention&versionID=..."), etc.
+ s3ErrorResponse(w, InvalidRequest, "API not supported", r.URL.Path+"?"+r.URL.RawQuery, http.StatusBadRequest)
+ return true
+ }
fi, err := fs.Stat(fspath)
if r.Method == "HEAD" && !objectNameGiven {
// HeadBucket
@@ -313,6 +326,11 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
http.FileServer(fs).ServeHTTP(w, &r)
return true
case r.Method == http.MethodPut:
+ if reRawQueryIndicatesAPI.MatchString(r.URL.RawQuery) {
+ // PutObjectAcl ("PUT /bucketid/objectid?acl&versionID=..."), etc.
+ s3ErrorResponse(w, InvalidRequest, "API not supported", r.URL.Path+"?"+r.URL.RawQuery, http.StatusBadRequest)
+ return true
+ }
if !objectNameGiven {
s3ErrorResponse(w, InvalidArgument, "Missing object name in PUT request.", r.URL.Path, http.StatusBadRequest)
return true
@@ -409,6 +427,11 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
w.WriteHeader(http.StatusOK)
return true
case r.Method == http.MethodDelete:
+ if reRawQueryIndicatesAPI.MatchString(r.URL.RawQuery) {
+ // DeleteObjectTagging ("DELETE /bucketid/objectid?tagging&versionID=..."), etc.
+ s3ErrorResponse(w, InvalidRequest, "API not supported", r.URL.Path+"?"+r.URL.RawQuery, http.StatusBadRequest)
+ return true
+ }
if !objectNameGiven || r.URL.Path == "/" {
s3ErrorResponse(w, InvalidArgument, "missing object name in DELETE request", r.URL.Path, http.StatusBadRequest)
return true
diff --git a/services/keep-web/s3_test.go b/services/keep-web/s3_test.go
index 3a4c4b224..6de36b6b2 100644
--- a/services/keep-web/s3_test.go
+++ b/services/keep-web/s3_test.go
@@ -76,7 +76,7 @@ func (s *IntegrationSuite) s3setup(c *check.C) s3stage {
auth := aws.NewAuth(arvadostest.ActiveTokenUUID, arvadostest.ActiveToken, "", time.Now().Add(time.Hour))
region := aws.Region{
- Name: s.testServer.Addr,
+ Name: "zzzzz",
S3Endpoint: "http://" + s.testServer.Addr,
}
client := s3.New(*auth, region)
@@ -451,7 +451,7 @@ func (stage *s3stage) writeBigDirs(c *check.C, dirs int, filesPerDir int) {
}
func (s *IntegrationSuite) sign(c *check.C, req *http.Request, key, secret string) {
- scope := "20200202/region/service/aws4_request"
+ scope := "20200202/zzzzz/service/aws4_request"
signedHeaders := "date"
req.Header.Set("Date", time.Now().UTC().Format(time.RFC1123))
stringToSign, err := s3stringToSign(s3SignAlgorithm, scope, signedHeaders, req)
@@ -556,7 +556,7 @@ func (s *IntegrationSuite) TestS3NormalizeURIForSignature(c *check.C) {
{"/foo%5bbar", "/foo%5Bbar"}, // %XX must be uppercase
} {
date := time.Now().UTC().Format("20060102T150405Z")
- scope := "20200202/fakeregion/S3/aws4_request"
+ scope := "20200202/zzzzz/S3/aws4_request"
canonicalRequest := fmt.Sprintf("%s\n%s\n%s\n%s\n%s\n%s", "GET", trial.normalizedPath, "", "host:host.example.com\n", "host", "")
c.Logf("canonicalRequest %q", canonicalRequest)
expect := fmt.Sprintf("%s\n%s\n%s\n%s", s3SignAlgorithm, date, scope, hashdigest(sha256.New(), canonicalRequest))
@@ -574,6 +574,23 @@ func (s *IntegrationSuite) TestS3NormalizeURIForSignature(c *check.C) {
}
}
+func (s *IntegrationSuite) TestS3GetBucketLocation(c *check.C) {
+ stage := s.s3setup(c)
+ defer stage.teardown(c)
+ for _, bucket := range []*s3.Bucket{stage.collbucket, stage.projbucket} {
+ req, err := http.NewRequest("GET", bucket.URL("/"), nil)
+ c.Check(err, check.IsNil)
+ req.Header.Set("Authorization", "AWS "+arvadostest.ActiveTokenV2+":none")
+ req.URL.RawQuery = "location"
+ resp, err := http.DefaultClient.Do(req)
+ c.Assert(err, check.IsNil)
+ c.Check(resp.Header.Get("Content-Type"), check.Equals, "application/xml")
+ buf, err := ioutil.ReadAll(resp.Body)
+ c.Assert(err, check.IsNil)
+ c.Check(string(buf), check.Equals, "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n<LocationConstraint><LocationConstraint xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">zzzzz</LocationConstraint></LocationConstraint>\n")
+ }
+}
+
func (s *IntegrationSuite) TestS3GetBucketVersioning(c *check.C) {
stage := s.s3setup(c)
defer stage.teardown(c)
@@ -591,6 +608,37 @@ func (s *IntegrationSuite) TestS3GetBucketVersioning(c *check.C) {
}
}
+func (s *IntegrationSuite) TestS3UnsupportedAPIs(c *check.C) {
+ stage := s.s3setup(c)
+ defer stage.teardown(c)
+ for _, trial := range []struct {
+ method string
+ path string
+ rawquery string
+ }{
+ {"GET", "/", "acl&versionId=1234"}, // GetBucketAcl
+ {"GET", "/foo", "acl&versionId=1234"}, // GetObjectAcl
+ {"PUT", "/", "acl"}, // PutBucketAcl
+ {"PUT", "/foo", "acl"}, // PutObjectAcl
+ {"DELETE", "/", "tagging"}, // DeleteBucketTagging
+ {"DELETE", "/foo", "tagging"}, // DeleteObjectTagging
+ } {
+ for _, bucket := range []*s3.Bucket{stage.collbucket, stage.projbucket} {
+ c.Logf("trial %v bucket %v", trial, bucket)
+ req, err := http.NewRequest(trial.method, bucket.URL(trial.path), nil)
+ c.Check(err, check.IsNil)
+ req.Header.Set("Authorization", "AWS "+arvadostest.ActiveTokenV2+":none")
+ req.URL.RawQuery = trial.rawquery
+ resp, err := http.DefaultClient.Do(req)
+ c.Assert(err, check.IsNil)
+ c.Check(resp.Header.Get("Content-Type"), check.Equals, "application/xml")
+ buf, err := ioutil.ReadAll(resp.Body)
+ c.Assert(err, check.IsNil)
+ c.Check(string(buf), check.Matches, "(?ms).*InvalidRequest.*API not supported.*")
+ }
+ }
+}
+
// If there are no CommonPrefixes entries, the CommonPrefixes XML tag
// should not appear at all.
func (s *IntegrationSuite) TestS3ListNoCommonPrefixes(c *check.C) {
commit a0bf72872492dd3b4a89e5bc359f480cbb4c76fb
Author: Tom Clegg <tom at curii.com>
Date: Tue Mar 2 11:12:11 2021 -0500
16745: Handle GetBucketLocation API.
Previously misinterpreted as ListObjects with no delimiter, which is
very slow.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/services/keep-web/s3.go b/services/keep-web/s3.go
index 3e82c5867..e6ee3e2dc 100644
--- a/services/keep-web/s3.go
+++ b/services/keep-web/s3.go
@@ -273,6 +273,11 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
w.Header().Set("Content-Type", "application/xml")
io.WriteString(w, xml.Header)
fmt.Fprintln(w, `<VersioningConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/"/>`)
+ } else if _, ok = r.URL.Query()["location"]; ok {
+ // GetBucketLocation
+ w.Header().Set("Content-Type", "application/xml")
+ io.WriteString(w, xml.Header)
+ fmt.Fprintln(w, `<LocationConstraint xmlns="http://s3.amazonaws.com/doc/2006-03-01/">`+h.Config.cluster.ClusterID+`</LocationConstraint>`)
} else {
// ListObjects
h.s3list(bucketName, w, r, fs)
commit 559cf52d3c14d03b09e56f0beb337a3d57b01918
Author: Tom Clegg <tom at curii.com>
Date: Tue Mar 2 09:55:16 2021 -0500
16745: Don't store nil in an atomic.Value (panic).
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/services/keep-web/cache.go b/services/keep-web/cache.go
index af143c77e..ec48be609 100644
--- a/services/keep-web/cache.go
+++ b/services/keep-web/cache.go
@@ -218,6 +218,7 @@ func (c *cache) GetSession(token string) (arvados.CustomFileSystem, error) {
now := time.Now()
ent, _ := c.sessions.Get(token)
sess, _ := ent.(*cachedSession)
+ expired := false
if sess == nil {
c.metrics.sessionMisses.Inc()
sess = &cachedSession{
@@ -226,13 +227,13 @@ func (c *cache) GetSession(token string) (arvados.CustomFileSystem, error) {
c.sessions.Add(token, sess)
} else if sess.expire.Before(now) {
c.metrics.sessionMisses.Inc()
- sess.fs.Store(nil)
+ expired = true
} else {
c.metrics.sessionHits.Inc()
}
go c.pruneSessions()
fs, _ := sess.fs.Load().(arvados.CustomFileSystem)
- if fs != nil {
+ if fs != nil && !expired {
return fs, nil
}
ac, err := arvados.NewClientFromConfig(c.cluster)
commit 47afa97d0e5220423c9101acb8ca69420e260e50
Author: Tom Clegg <tom at curii.com>
Date: Mon Feb 22 21:46:27 2021 -0500
16745: Prune enough sessions to reach size limit, not all.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/services/keep-web/cache.go b/services/keep-web/cache.go
index 71e995330..af143c77e 100644
--- a/services/keep-web/cache.go
+++ b/services/keep-web/cache.go
@@ -83,7 +83,7 @@ func (m *cacheMetrics) setup(reg *prometheus.Registry) {
Namespace: "arvados",
Subsystem: "keepweb_collectioncache",
Name: "cached_manifest_bytes",
- Help: "Total size of all manifests in cache.",
+ Help: "Total size of all cached manifests and sessions.",
})
reg.MustRegister(m.collectionBytes)
m.collectionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
@@ -251,10 +251,13 @@ func (c *cache) GetSession(token string) (arvados.CustomFileSystem, error) {
return fs, nil
}
+// Remove all expired session cache entries, then remove more entries
+// until approximate remaining size <= maxsize/2
func (c *cache) pruneSessions() {
now := time.Now()
var size int64
- for _, token := range c.sessions.Keys() {
+ keys := c.sessions.Keys()
+ for _, token := range keys {
ent, ok := c.sessions.Peek(token)
if !ok {
continue
@@ -264,16 +267,28 @@ func (c *cache) pruneSessions() {
c.sessions.Remove(token)
continue
}
+ if fs, ok := s.fs.Load().(arvados.CustomFileSystem); ok {
+ size += fs.MemorySize()
+ }
+ }
+ // Remove tokens until reaching size limit, starting with the
+ // least frequently used entries (which Keys() returns last).
+ for i := len(keys) - 1; i >= 0; i-- {
+ token := keys[i]
+ if size <= c.cluster.Collections.WebDAVCache.MaxCollectionBytes/2 {
+ break
+ }
+ ent, ok := c.sessions.Peek(token)
+ if !ok {
+ continue
+ }
+ s := ent.(*cachedSession)
fs, _ := s.fs.Load().(arvados.CustomFileSystem)
if fs == nil {
continue
}
- size += fs.MemorySize()
- }
- if size > c.cluster.Collections.WebDAVCache.MaxCollectionBytes/2 {
- for _, token := range c.sessions.Keys() {
- c.sessions.Remove(token)
- }
+ c.sessions.Remove(token)
+ size -= fs.MemorySize()
}
}
commit 0b714d15c15340f3f52c331528d5cbb4cc422f07
Author: Tom Clegg <tom at curii.com>
Date: Mon Feb 22 11:10:10 2021 -0500
16745: Keep a SiteFileSystem alive for multiple read requests.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml
index 3ecff5448..1d3fcb748 100644
--- a/lib/config/config.default.yml
+++ b/lib/config/config.default.yml
@@ -523,21 +523,30 @@ Clusters:
TrustAllContent: false
# Cache parameters for WebDAV content serving:
- # * TTL: Maximum time to cache manifests and permission checks.
- # * UUIDTTL: Maximum time to cache collection state.
- # * MaxBlockEntries: Maximum number of block cache entries.
- # * MaxCollectionEntries: Maximum number of collection cache entries.
- # * MaxCollectionBytes: Approximate memory limit for collection cache.
- # * MaxPermissionEntries: Maximum number of permission cache entries.
- # * MaxUUIDEntries: Maximum number of UUID cache entries.
WebDAVCache:
+ # Time to cache manifests, permission checks, and sessions.
TTL: 300s
+
+ # Time to cache collection state.
UUIDTTL: 5s
- MaxBlockEntries: 4
+
+ # Block cache entries. Each block consumes up to 64 MiB RAM.
+ MaxBlockEntries: 4
+
+ # Collection cache entries.
MaxCollectionEntries: 1000
- MaxCollectionBytes: 100000000
+
+ # Approximate memory limit (in bytes) for collection cache.
+ MaxCollectionBytes: 100000000
+
+ # Permission cache entries.
MaxPermissionEntries: 1000
- MaxUUIDEntries: 1000
+
+ # UUID cache entries.
+ MaxUUIDEntries: 1000
+
+ # Persistent sessions.
+ MaxSessions: 100
Login:
# One of the following mechanisms (SSO, Google, PAM, LDAP, or
diff --git a/lib/config/generated_config.go b/lib/config/generated_config.go
index cd752ab75..949a842c1 100644
--- a/lib/config/generated_config.go
+++ b/lib/config/generated_config.go
@@ -529,21 +529,30 @@ Clusters:
TrustAllContent: false
# Cache parameters for WebDAV content serving:
- # * TTL: Maximum time to cache manifests and permission checks.
- # * UUIDTTL: Maximum time to cache collection state.
- # * MaxBlockEntries: Maximum number of block cache entries.
- # * MaxCollectionEntries: Maximum number of collection cache entries.
- # * MaxCollectionBytes: Approximate memory limit for collection cache.
- # * MaxPermissionEntries: Maximum number of permission cache entries.
- # * MaxUUIDEntries: Maximum number of UUID cache entries.
WebDAVCache:
+ # Time to cache manifests, permission checks, and sessions.
TTL: 300s
+
+ # Time to cache collection state.
UUIDTTL: 5s
- MaxBlockEntries: 4
+
+ # Block cache entries. Each block consumes up to 64 MiB RAM.
+ MaxBlockEntries: 4
+
+ # Collection cache entries.
MaxCollectionEntries: 1000
- MaxCollectionBytes: 100000000
+
+ # Approximate memory limit (in bytes) for collection cache.
+ MaxCollectionBytes: 100000000
+
+ # Permission cache entries.
MaxPermissionEntries: 1000
- MaxUUIDEntries: 1000
+
+ # UUID cache entries.
+ MaxUUIDEntries: 1000
+
+ # Persistent sessions.
+ MaxSessions: 100
Login:
# One of the following mechanisms (SSO, Google, PAM, LDAP, or
diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go
index 2e39985c2..6fe7a022b 100644
--- a/sdk/go/arvados/config.go
+++ b/sdk/go/arvados/config.go
@@ -65,6 +65,7 @@ type WebDAVCacheConfig struct {
MaxCollectionBytes int64
MaxPermissionEntries int
MaxUUIDEntries int
+ MaxSessions int
}
type Cluster struct {
diff --git a/sdk/go/arvados/fs_base.go b/sdk/go/arvados/fs_base.go
index aa75fee7c..2478641df 100644
--- a/sdk/go/arvados/fs_base.go
+++ b/sdk/go/arvados/fs_base.go
@@ -106,6 +106,9 @@ type FileSystem interface {
// path is "", flush all dirs/streams; otherwise, flush only
// the specified dir/stream.
Flush(path string, shortBlocks bool) error
+
+ // Estimate current memory usage.
+ MemorySize() int64
}
type inode interface {
@@ -156,6 +159,7 @@ type inode interface {
sync.Locker
RLock()
RUnlock()
+ MemorySize() int64
}
type fileinfo struct {
@@ -229,6 +233,13 @@ func (*nullnode) Child(name string, replace func(inode) (inode, error)) (inode,
return nil, ErrNotADirectory
}
+func (*nullnode) MemorySize() int64 {
+ // Types that embed nullnode should report their own size, but
+ // if they don't, we at least report a non-zero size to ensure
+ // a large tree doesn't get reported as 0 bytes.
+ return 64
+}
+
type treenode struct {
fs FileSystem
parent inode
@@ -319,6 +330,15 @@ func (n *treenode) Sync() error {
return nil
}
+func (n *treenode) MemorySize() (size int64) {
+ n.RLock()
+ defer n.RUnlock()
+ for _, inode := range n.inodes {
+ size += inode.MemorySize()
+ }
+ return
+}
+
type fileSystem struct {
root inode
fsBackend
@@ -607,6 +627,10 @@ func (fs *fileSystem) Flush(string, bool) error {
return ErrInvalidOperation
}
+func (fs *fileSystem) MemorySize() int64 {
+ return fs.root.MemorySize()
+}
+
// rlookup (recursive lookup) returns the inode for the file/directory
// with the given name (which may contain "/" separators). If no such
// file/directory exists, the returned node is nil.
diff --git a/sdk/go/arvados/fs_collection.go b/sdk/go/arvados/fs_collection.go
index 1de558a1b..0233826a7 100644
--- a/sdk/go/arvados/fs_collection.go
+++ b/sdk/go/arvados/fs_collection.go
@@ -38,9 +38,6 @@ type CollectionFileSystem interface {
// Total data bytes in all files.
Size() int64
-
- // Memory consumed by buffered file data.
- memorySize() int64
}
type collectionFileSystem struct {
@@ -232,10 +229,10 @@ func (fs *collectionFileSystem) Flush(path string, shortBlocks bool) error {
return dn.flush(context.TODO(), names, flushOpts{sync: false, shortBlocks: shortBlocks})
}
-func (fs *collectionFileSystem) memorySize() int64 {
+func (fs *collectionFileSystem) MemorySize() int64 {
fs.fileSystem.root.Lock()
defer fs.fileSystem.root.Unlock()
- return fs.fileSystem.root.(*dirnode).memorySize()
+ return fs.fileSystem.root.(*dirnode).MemorySize()
}
func (fs *collectionFileSystem) MarshalManifest(prefix string) (string, error) {
@@ -879,14 +876,14 @@ func (dn *dirnode) flush(ctx context.Context, names []string, opts flushOpts) er
}
// caller must have write lock.
-func (dn *dirnode) memorySize() (size int64) {
+func (dn *dirnode) MemorySize() (size int64) {
for _, name := range dn.sortedNames() {
node := dn.inodes[name]
node.Lock()
defer node.Unlock()
switch node := node.(type) {
case *dirnode:
- size += node.memorySize()
+ size += node.MemorySize()
case *filenode:
for _, seg := range node.segments {
switch seg := seg.(type) {
diff --git a/sdk/go/arvados/fs_collection_test.go b/sdk/go/arvados/fs_collection_test.go
index 59a6a6ba8..05c8ea61a 100644
--- a/sdk/go/arvados/fs_collection_test.go
+++ b/sdk/go/arvados/fs_collection_test.go
@@ -1153,9 +1153,9 @@ func (s *CollectionFSSuite) TestFlushAll(c *check.C) {
fs.Flush("", true)
}
- size := fs.memorySize()
+ size := fs.MemorySize()
if !c.Check(size <= 1<<24, check.Equals, true) {
- c.Logf("at dir%d fs.memorySize()=%d", i, size)
+ c.Logf("at dir%d fs.MemorySize()=%d", i, size)
return
}
}
@@ -1188,13 +1188,13 @@ func (s *CollectionFSSuite) TestFlushFullBlocksOnly(c *check.C) {
c.Assert(err, check.IsNil)
}
}
- c.Check(fs.memorySize(), check.Equals, int64(nDirs*67<<20))
+ c.Check(fs.MemorySize(), check.Equals, int64(nDirs*67<<20))
c.Check(flushed, check.Equals, int64(0))
waitForFlush := func(expectUnflushed, expectFlushed int64) {
- for deadline := time.Now().Add(5 * time.Second); fs.memorySize() > expectUnflushed && time.Now().Before(deadline); time.Sleep(10 * time.Millisecond) {
+ for deadline := time.Now().Add(5 * time.Second); fs.MemorySize() > expectUnflushed && time.Now().Before(deadline); time.Sleep(10 * time.Millisecond) {
}
- c.Check(fs.memorySize(), check.Equals, expectUnflushed)
+ c.Check(fs.MemorySize(), check.Equals, expectUnflushed)
c.Check(flushed, check.Equals, expectFlushed)
}
diff --git a/sdk/go/arvados/fs_deferred.go b/sdk/go/arvados/fs_deferred.go
index 254b90c81..bb6c7a262 100644
--- a/sdk/go/arvados/fs_deferred.go
+++ b/sdk/go/arvados/fs_deferred.go
@@ -112,3 +112,4 @@ func (dn *deferrednode) RLock() { dn.realinode().RLock(
func (dn *deferrednode) RUnlock() { dn.realinode().RUnlock() }
func (dn *deferrednode) FS() FileSystem { return dn.currentinode().FS() }
func (dn *deferrednode) Parent() inode { return dn.currentinode().Parent() }
+func (dn *deferrednode) MemorySize() int64 { return dn.currentinode().MemorySize() }
diff --git a/services/keep-web/cache.go b/services/keep-web/cache.go
index eeb78ad90..71e995330 100644
--- a/services/keep-web/cache.go
+++ b/services/keep-web/cache.go
@@ -6,23 +6,27 @@ package main
import (
"sync"
+ "sync/atomic"
"time"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
- "github.com/hashicorp/golang-lru"
+ "git.arvados.org/arvados.git/sdk/go/keepclient"
+ lru "github.com/hashicorp/golang-lru"
"github.com/prometheus/client_golang/prometheus"
)
const metricsUpdateInterval = time.Second / 10
type cache struct {
- config *arvados.WebDAVCacheConfig
+ cluster *arvados.Cluster
+ config *arvados.WebDAVCacheConfig // TODO: use cluster.Collections.WebDAV instead
registry *prometheus.Registry
metrics cacheMetrics
pdhs *lru.TwoQueueCache
collections *lru.TwoQueueCache
permissions *lru.TwoQueueCache
+ sessions *lru.TwoQueueCache
setupOnce sync.Once
}
@@ -30,9 +34,12 @@ type cacheMetrics struct {
requests prometheus.Counter
collectionBytes prometheus.Gauge
collectionEntries prometheus.Gauge
+ sessionEntries prometheus.Gauge
collectionHits prometheus.Counter
pdhHits prometheus.Counter
permissionHits prometheus.Counter
+ sessionHits prometheus.Counter
+ sessionMisses prometheus.Counter
apiCalls prometheus.Counter
}
@@ -86,6 +93,27 @@ func (m *cacheMetrics) setup(reg *prometheus.Registry) {
Help: "Number of manifests in cache.",
})
reg.MustRegister(m.collectionEntries)
+ m.sessionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "keepweb_sessions",
+ Name: "active",
+ Help: "Number of active token sessions.",
+ })
+ reg.MustRegister(m.sessionEntries)
+ m.sessionHits = prometheus.NewCounter(prometheus.CounterOpts{
+ Namespace: "arvados",
+ Subsystem: "keepweb_sessions",
+ Name: "hits",
+ Help: "Number of token session cache hits.",
+ })
+ reg.MustRegister(m.sessionHits)
+ m.sessionMisses = prometheus.NewCounter(prometheus.CounterOpts{
+ Namespace: "arvados",
+ Subsystem: "keepweb_sessions",
+ Name: "misses",
+ Help: "Number of token session cache misses.",
+ })
+ reg.MustRegister(m.sessionMisses)
}
type cachedPDH struct {
@@ -102,6 +130,11 @@ type cachedPermission struct {
expire time.Time
}
+type cachedSession struct {
+ expire time.Time
+ fs atomic.Value
+}
+
func (c *cache) setup() {
var err error
c.pdhs, err = lru.New2Q(c.config.MaxUUIDEntries)
@@ -116,6 +149,10 @@ func (c *cache) setup() {
if err != nil {
panic(err)
}
+ c.sessions, err = lru.New2Q(c.config.MaxSessions)
+ if err != nil {
+ panic(err)
+ }
reg := c.registry
if reg == nil {
@@ -132,6 +169,7 @@ func (c *cache) setup() {
func (c *cache) updateGauges() {
c.metrics.collectionBytes.Set(float64(c.collectionBytes()))
c.metrics.collectionEntries.Set(float64(c.collections.Len()))
+ c.metrics.sessionEntries.Set(float64(c.sessions.Len()))
}
var selectPDH = map[string]interface{}{
@@ -165,6 +203,80 @@ func (c *cache) Update(client *arvados.Client, coll arvados.Collection, fs arvad
return err
}
+// ResetSession unloads any potentially stale state. Should be called
+// after write operations, so subsequent reads don't return stale
+// data.
+func (c *cache) ResetSession(token string) {
+ c.setupOnce.Do(c.setup)
+ c.sessions.Remove(token)
+}
+
+// Get a long-lived CustomFileSystem suitable for doing a read operation
+// with the given token.
+func (c *cache) GetSession(token string) (arvados.CustomFileSystem, error) {
+ c.setupOnce.Do(c.setup)
+ now := time.Now()
+ ent, _ := c.sessions.Get(token)
+ sess, _ := ent.(*cachedSession)
+ if sess == nil {
+ c.metrics.sessionMisses.Inc()
+ sess = &cachedSession{
+ expire: now.Add(c.config.TTL.Duration()),
+ }
+ c.sessions.Add(token, sess)
+ } else if sess.expire.Before(now) {
+ c.metrics.sessionMisses.Inc()
+ sess.fs.Store(nil)
+ } else {
+ c.metrics.sessionHits.Inc()
+ }
+ go c.pruneSessions()
+ fs, _ := sess.fs.Load().(arvados.CustomFileSystem)
+ if fs != nil {
+ return fs, nil
+ }
+ ac, err := arvados.NewClientFromConfig(c.cluster)
+ if err != nil {
+ return nil, err
+ }
+ ac.AuthToken = token
+ arv, err := arvadosclient.New(ac)
+ if err != nil {
+ return nil, err
+ }
+ kc := keepclient.New(arv)
+ fs = ac.SiteFileSystem(kc)
+ fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution)
+ sess.fs.Store(fs)
+ return fs, nil
+}
+
+func (c *cache) pruneSessions() {
+ now := time.Now()
+ var size int64
+ for _, token := range c.sessions.Keys() {
+ ent, ok := c.sessions.Peek(token)
+ if !ok {
+ continue
+ }
+ s := ent.(*cachedSession)
+ if s.expire.Before(now) {
+ c.sessions.Remove(token)
+ continue
+ }
+ fs, _ := s.fs.Load().(arvados.CustomFileSystem)
+ if fs == nil {
+ continue
+ }
+ size += fs.MemorySize()
+ }
+ if size > c.cluster.Collections.WebDAVCache.MaxCollectionBytes/2 {
+ for _, token := range c.sessions.Keys() {
+ c.sessions.Remove(token)
+ }
+ }
+}
+
func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) {
c.setupOnce.Do(c.setup)
c.metrics.requests.Inc()
@@ -288,7 +400,7 @@ func (c *cache) pruneCollections() {
}
}
for i, k := range keys {
- if size <= c.config.MaxCollectionBytes {
+ if size <= c.config.MaxCollectionBytes/2 {
break
}
if expired[i] {
@@ -300,8 +412,8 @@ func (c *cache) pruneCollections() {
}
}
-// collectionBytes returns the approximate memory size of the
-// collection cache.
+// collectionBytes returns the approximate combined memory size of the
+// collection cache and session filesystem cache.
func (c *cache) collectionBytes() uint64 {
var size uint64
for _, k := range c.collections.Keys() {
@@ -311,6 +423,15 @@ func (c *cache) collectionBytes() uint64 {
}
size += uint64(len(v.(*cachedCollection).collection.ManifestText))
}
+ for _, token := range c.sessions.Keys() {
+ ent, ok := c.sessions.Peek(token)
+ if !ok {
+ continue
+ }
+ if fs, ok := ent.(*cachedSession).fs.Load().(arvados.CustomFileSystem); ok {
+ size += uint64(fs.MemorySize())
+ }
+ }
return size
}
diff --git a/services/keep-web/handler.go b/services/keep-web/handler.go
index 2d6fb78f8..4ea2fa2f6 100644
--- a/services/keep-web/handler.go
+++ b/services/keep-web/handler.go
@@ -520,7 +520,8 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
func (h *handler) getClients(reqID, token string) (arv *arvadosclient.ArvadosClient, kc *keepclient.KeepClient, client *arvados.Client, release func(), err error) {
arv = h.clientPool.Get()
if arv == nil {
- return nil, nil, nil, nil, err
+ err = h.clientPool.Err()
+ return
}
release = func() { h.clientPool.Put(arv) }
arv.ApiToken = token
@@ -548,14 +549,11 @@ func (h *handler) serveSiteFS(w http.ResponseWriter, r *http.Request, tokens []s
http.Error(w, errReadOnly.Error(), http.StatusMethodNotAllowed)
return
}
- _, kc, client, release, err := h.getClients(r.Header.Get("X-Request-Id"), tokens[0])
+ fs, err := h.Config.Cache.GetSession(tokens[0])
if err != nil {
- http.Error(w, "Pool failed: "+h.clientPool.Err().Error(), http.StatusInternalServerError)
+ http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
- defer release()
-
- fs := client.SiteFileSystem(kc)
fs.ForwardSlashNameSubstitution(h.Config.cluster.Collections.ForwardSlashNameSubstitution)
f, err := fs.Open(r.URL.Path)
if os.IsNotExist(err) {
diff --git a/services/keep-web/main.go b/services/keep-web/main.go
index 647eab165..a62e0abb6 100644
--- a/services/keep-web/main.go
+++ b/services/keep-web/main.go
@@ -38,6 +38,7 @@ func newConfig(arvCfg *arvados.Config) *Config {
}
cfg.cluster = cls
cfg.Cache.config = &cfg.cluster.Collections.WebDAVCache
+ cfg.Cache.cluster = cls
return &cfg
}
diff --git a/services/keep-web/s3.go b/services/keep-web/s3.go
index 66f89db9e..3e82c5867 100644
--- a/services/keep-web/s3.go
+++ b/services/keep-web/s3.go
@@ -228,15 +228,29 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
return false
}
- _, kc, client, release, err := h.getClients(r.Header.Get("X-Request-Id"), token)
- if err != nil {
- s3ErrorResponse(w, InternalError, "Pool failed: "+h.clientPool.Err().Error(), r.URL.Path, http.StatusInternalServerError)
- return true
+ var err error
+ var fs arvados.CustomFileSystem
+ if r.Method == http.MethodGet || r.Method == http.MethodHead {
+ // Use a single session (cached FileSystem) across
+ // multiple read requests.
+ fs, err = h.Config.Cache.GetSession(token)
+ if err != nil {
+ s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError)
+ return true
+ }
+ } else {
+ // Create a FileSystem for this request, to avoid
+ // exposing incomplete write operations to concurrent
+ // requests.
+ _, kc, client, release, err := h.getClients(r.Header.Get("X-Request-Id"), token)
+ if err != nil {
+ s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError)
+ return true
+ }
+ defer release()
+ fs = client.SiteFileSystem(kc)
+ fs.ForwardSlashNameSubstitution(h.Config.cluster.Collections.ForwardSlashNameSubstitution)
}
- defer release()
-
- fs := client.SiteFileSystem(kc)
- fs.ForwardSlashNameSubstitution(h.Config.cluster.Collections.ForwardSlashNameSubstitution)
var objectNameGiven bool
var bucketName string
@@ -385,6 +399,8 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError)
return true
}
+ // Ensure a subsequent read operation will see the changes.
+ h.Config.Cache.ResetSession(token)
w.WriteHeader(http.StatusOK)
return true
case r.Method == http.MethodDelete:
@@ -432,11 +448,12 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError)
return true
}
+ // Ensure a subsequent read operation will see the changes.
+ h.Config.Cache.ResetSession(token)
w.WriteHeader(http.StatusNoContent)
return true
default:
s3ErrorResponse(w, InvalidRequest, "method not allowed", r.URL.Path, http.StatusMethodNotAllowed)
-
return true
}
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list