[ARVADOS] created: 2.1.0-119-gf3451bb69

Git user git at public.arvados.org
Wed Mar 10 15:29:42 UTC 2021


        at  f3451bb6907e8692fe8ef55af0174fc7ed36c8bc (commit)


commit f3451bb6907e8692fe8ef55af0174fc7ed36c8bc
Author: Tom Clegg <tom at curii.com>
Date:   Wed Mar 3 10:05:02 2021 -0500

    16745: Update test.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/services/keep-web/server_test.go b/services/keep-web/server_test.go
index 0a1c7d1b3..5c68eb424 100644
--- a/services/keep-web/server_test.go
+++ b/services/keep-web/server_test.go
@@ -395,7 +395,7 @@ func (s *IntegrationSuite) TestMetrics(c *check.C) {
 	c.Check(counters["arvados_keepweb_collectioncache_permission_hits//"].Value, check.Equals, int64(1))
 	c.Check(gauges["arvados_keepweb_collectioncache_cached_manifests//"].Value, check.Equals, float64(1))
 	// FooCollection's cached manifest size is 45 ("1f4b0....+45") plus one 51-byte blob signature
-	c.Check(gauges["arvados_keepweb_collectioncache_cached_manifest_bytes//"].Value, check.Equals, float64(45+51))
+	c.Check(gauges["arvados_keepweb_sessions_cached_collection_bytes//"].Value, check.Equals, float64(45+51))
 
 	// If the Host header indicates a collection, /metrics.json
 	// refers to a file in the collection -- the metrics handler

commit 2ae2378cb44c761ed5f3df4c97247d7fba862934
Author: Tom Clegg <tom at curii.com>
Date:   Tue Mar 2 22:25:11 2021 -0500

    16745: Rename session cache size metric.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/services/keep-web/cache.go b/services/keep-web/cache.go
index ec48be609..07db7a016 100644
--- a/services/keep-web/cache.go
+++ b/services/keep-web/cache.go
@@ -81,8 +81,8 @@ func (m *cacheMetrics) setup(reg *prometheus.Registry) {
 	reg.MustRegister(m.apiCalls)
 	m.collectionBytes = prometheus.NewGauge(prometheus.GaugeOpts{
 		Namespace: "arvados",
-		Subsystem: "keepweb_collectioncache",
-		Name:      "cached_manifest_bytes",
+		Subsystem: "keepweb_sessions",
+		Name:      "cached_collection_bytes",
 		Help:      "Total size of all cached manifests and sessions.",
 	})
 	reg.MustRegister(m.collectionBytes)

commit 252e51a36db295cde96ebc48f72ee4aa6b48dd81
Author: Tom Clegg <tom at curii.com>
Date:   Tue Mar 2 16:27:18 2021 -0500

    16745: Reject unsupported APIs instead of mishandling.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/services/keep-web/s3.go b/services/keep-web/s3.go
index e6ee3e2dc..e08b2b844 100644
--- a/services/keep-web/s3.go
+++ b/services/keep-web/s3.go
@@ -102,6 +102,7 @@ func s3stringToSign(alg, scope, signedHeaders string, r *http.Request) (string,
 	normalizedURL := *r.URL
 	normalizedURL.RawPath = ""
 	normalizedURL.Path = reMultipleSlashChars.ReplaceAllString(normalizedURL.Path, "/")
+	ctxlog.FromContext(r.Context()).Infof("escapedPath %s", normalizedURL.EscapedPath())
 	canonicalRequest := fmt.Sprintf("%s\n%s\n%s\n%s\n%s\n%s", r.Method, normalizedURL.EscapedPath(), s3querystring(r.URL), canonicalHeaders, signedHeaders, r.Header.Get("X-Amz-Content-Sha256"))
 	ctxlog.FromContext(r.Context()).Debugf("s3stringToSign: canonicalRequest %s", canonicalRequest)
 	return fmt.Sprintf("%s\n%s\n%s\n%s", alg, r.Header.Get("X-Amz-Date"), scope, hashdigest(sha256.New(), canonicalRequest)), nil
@@ -206,6 +207,8 @@ var UnauthorizedAccess = "UnauthorizedAccess"
 var InvalidRequest = "InvalidRequest"
 var SignatureDoesNotMatch = "SignatureDoesNotMatch"
 
+var reRawQueryIndicatesAPI = regexp.MustCompile(`^[a-z]+(&|$)`)
+
 // serveS3 handles r and returns true if r is a request from an S3
 // client, otherwise it returns false.
 func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
@@ -277,13 +280,23 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
 			// GetBucketLocation
 			w.Header().Set("Content-Type", "application/xml")
 			io.WriteString(w, xml.Header)
-			fmt.Fprintln(w, `<LocationConstraint xmlns="http://s3.amazonaws.com/doc/2006-03-01/">`+h.Config.cluster.ClusterID+`</LocationConstraint>`)
+			fmt.Fprintln(w, `<LocationConstraint><LocationConstraint xmlns="http://s3.amazonaws.com/doc/2006-03-01/">`+
+				h.Config.cluster.ClusterID+
+				`</LocationConstraint></LocationConstraint>`)
+		} else if reRawQueryIndicatesAPI.MatchString(r.URL.RawQuery) {
+			// GetBucketWebsite ("GET /bucketid/?website"), GetBucketTagging, etc.
+			s3ErrorResponse(w, InvalidRequest, "API not supported", r.URL.Path+"?"+r.URL.RawQuery, http.StatusBadRequest)
 		} else {
 			// ListObjects
 			h.s3list(bucketName, w, r, fs)
 		}
 		return true
 	case r.Method == http.MethodGet || r.Method == http.MethodHead:
+		if reRawQueryIndicatesAPI.MatchString(r.URL.RawQuery) {
+			// GetObjectRetention ("GET /bucketid/objectid?retention&versionID=..."), etc.
+			s3ErrorResponse(w, InvalidRequest, "API not supported", r.URL.Path+"?"+r.URL.RawQuery, http.StatusBadRequest)
+			return true
+		}
 		fi, err := fs.Stat(fspath)
 		if r.Method == "HEAD" && !objectNameGiven {
 			// HeadBucket
@@ -313,6 +326,11 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
 		http.FileServer(fs).ServeHTTP(w, &r)
 		return true
 	case r.Method == http.MethodPut:
+		if reRawQueryIndicatesAPI.MatchString(r.URL.RawQuery) {
+			// PutObjectAcl ("PUT /bucketid/objectid?acl&versionID=..."), etc.
+			s3ErrorResponse(w, InvalidRequest, "API not supported", r.URL.Path+"?"+r.URL.RawQuery, http.StatusBadRequest)
+			return true
+		}
 		if !objectNameGiven {
 			s3ErrorResponse(w, InvalidArgument, "Missing object name in PUT request.", r.URL.Path, http.StatusBadRequest)
 			return true
@@ -409,6 +427,11 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
 		w.WriteHeader(http.StatusOK)
 		return true
 	case r.Method == http.MethodDelete:
+		if reRawQueryIndicatesAPI.MatchString(r.URL.RawQuery) {
+			// DeleteObjectTagging ("DELETE /bucketid/objectid?tagging&versionID=..."), etc.
+			s3ErrorResponse(w, InvalidRequest, "API not supported", r.URL.Path+"?"+r.URL.RawQuery, http.StatusBadRequest)
+			return true
+		}
 		if !objectNameGiven || r.URL.Path == "/" {
 			s3ErrorResponse(w, InvalidArgument, "missing object name in DELETE request", r.URL.Path, http.StatusBadRequest)
 			return true
diff --git a/services/keep-web/s3_test.go b/services/keep-web/s3_test.go
index 3a4c4b224..6de36b6b2 100644
--- a/services/keep-web/s3_test.go
+++ b/services/keep-web/s3_test.go
@@ -76,7 +76,7 @@ func (s *IntegrationSuite) s3setup(c *check.C) s3stage {
 
 	auth := aws.NewAuth(arvadostest.ActiveTokenUUID, arvadostest.ActiveToken, "", time.Now().Add(time.Hour))
 	region := aws.Region{
-		Name:       s.testServer.Addr,
+		Name:       "zzzzz",
 		S3Endpoint: "http://" + s.testServer.Addr,
 	}
 	client := s3.New(*auth, region)
@@ -451,7 +451,7 @@ func (stage *s3stage) writeBigDirs(c *check.C, dirs int, filesPerDir int) {
 }
 
 func (s *IntegrationSuite) sign(c *check.C, req *http.Request, key, secret string) {
-	scope := "20200202/region/service/aws4_request"
+	scope := "20200202/zzzzz/service/aws4_request"
 	signedHeaders := "date"
 	req.Header.Set("Date", time.Now().UTC().Format(time.RFC1123))
 	stringToSign, err := s3stringToSign(s3SignAlgorithm, scope, signedHeaders, req)
@@ -556,7 +556,7 @@ func (s *IntegrationSuite) TestS3NormalizeURIForSignature(c *check.C) {
 		{"/foo%5bbar", "/foo%5Bbar"}, // %XX must be uppercase
 	} {
 		date := time.Now().UTC().Format("20060102T150405Z")
-		scope := "20200202/fakeregion/S3/aws4_request"
+		scope := "20200202/zzzzz/S3/aws4_request"
 		canonicalRequest := fmt.Sprintf("%s\n%s\n%s\n%s\n%s\n%s", "GET", trial.normalizedPath, "", "host:host.example.com\n", "host", "")
 		c.Logf("canonicalRequest %q", canonicalRequest)
 		expect := fmt.Sprintf("%s\n%s\n%s\n%s", s3SignAlgorithm, date, scope, hashdigest(sha256.New(), canonicalRequest))
@@ -574,6 +574,23 @@ func (s *IntegrationSuite) TestS3NormalizeURIForSignature(c *check.C) {
 	}
 }
 
+func (s *IntegrationSuite) TestS3GetBucketLocation(c *check.C) {
+	stage := s.s3setup(c)
+	defer stage.teardown(c)
+	for _, bucket := range []*s3.Bucket{stage.collbucket, stage.projbucket} {
+		req, err := http.NewRequest("GET", bucket.URL("/"), nil)
+		c.Check(err, check.IsNil)
+		req.Header.Set("Authorization", "AWS "+arvadostest.ActiveTokenV2+":none")
+		req.URL.RawQuery = "location"
+		resp, err := http.DefaultClient.Do(req)
+		c.Assert(err, check.IsNil)
+		c.Check(resp.Header.Get("Content-Type"), check.Equals, "application/xml")
+		buf, err := ioutil.ReadAll(resp.Body)
+		c.Assert(err, check.IsNil)
+		c.Check(string(buf), check.Equals, "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n<LocationConstraint><LocationConstraint xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">zzzzz</LocationConstraint></LocationConstraint>\n")
+	}
+}
+
 func (s *IntegrationSuite) TestS3GetBucketVersioning(c *check.C) {
 	stage := s.s3setup(c)
 	defer stage.teardown(c)
@@ -591,6 +608,37 @@ func (s *IntegrationSuite) TestS3GetBucketVersioning(c *check.C) {
 	}
 }
 
+func (s *IntegrationSuite) TestS3UnsupportedAPIs(c *check.C) {
+	stage := s.s3setup(c)
+	defer stage.teardown(c)
+	for _, trial := range []struct {
+		method   string
+		path     string
+		rawquery string
+	}{
+		{"GET", "/", "acl&versionId=1234"},    // GetBucketAcl
+		{"GET", "/foo", "acl&versionId=1234"}, // GetObjectAcl
+		{"PUT", "/", "acl"},                   // PutBucketAcl
+		{"PUT", "/foo", "acl"},                // PutObjectAcl
+		{"DELETE", "/", "tagging"},            // DeleteBucketTagging
+		{"DELETE", "/foo", "tagging"},         // DeleteObjectTagging
+	} {
+		for _, bucket := range []*s3.Bucket{stage.collbucket, stage.projbucket} {
+			c.Logf("trial %v bucket %v", trial, bucket)
+			req, err := http.NewRequest(trial.method, bucket.URL(trial.path), nil)
+			c.Check(err, check.IsNil)
+			req.Header.Set("Authorization", "AWS "+arvadostest.ActiveTokenV2+":none")
+			req.URL.RawQuery = trial.rawquery
+			resp, err := http.DefaultClient.Do(req)
+			c.Assert(err, check.IsNil)
+			c.Check(resp.Header.Get("Content-Type"), check.Equals, "application/xml")
+			buf, err := ioutil.ReadAll(resp.Body)
+			c.Assert(err, check.IsNil)
+			c.Check(string(buf), check.Matches, "(?ms).*InvalidRequest.*API not supported.*")
+		}
+	}
+}
+
 // If there are no CommonPrefixes entries, the CommonPrefixes XML tag
 // should not appear at all.
 func (s *IntegrationSuite) TestS3ListNoCommonPrefixes(c *check.C) {

commit a0bf72872492dd3b4a89e5bc359f480cbb4c76fb
Author: Tom Clegg <tom at curii.com>
Date:   Tue Mar 2 11:12:11 2021 -0500

    16745: Handle GetBucketLocation API.
    
    Previously misinterpreted as ListObjects with no delimiter, which is
    very slow.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/services/keep-web/s3.go b/services/keep-web/s3.go
index 3e82c5867..e6ee3e2dc 100644
--- a/services/keep-web/s3.go
+++ b/services/keep-web/s3.go
@@ -273,6 +273,11 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
 			w.Header().Set("Content-Type", "application/xml")
 			io.WriteString(w, xml.Header)
 			fmt.Fprintln(w, `<VersioningConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/"/>`)
+		} else if _, ok = r.URL.Query()["location"]; ok {
+			// GetBucketLocation
+			w.Header().Set("Content-Type", "application/xml")
+			io.WriteString(w, xml.Header)
+			fmt.Fprintln(w, `<LocationConstraint xmlns="http://s3.amazonaws.com/doc/2006-03-01/">`+h.Config.cluster.ClusterID+`</LocationConstraint>`)
 		} else {
 			// ListObjects
 			h.s3list(bucketName, w, r, fs)

commit 559cf52d3c14d03b09e56f0beb337a3d57b01918
Author: Tom Clegg <tom at curii.com>
Date:   Tue Mar 2 09:55:16 2021 -0500

    16745: Don't store nil in an atomic.Value (panic).
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/services/keep-web/cache.go b/services/keep-web/cache.go
index af143c77e..ec48be609 100644
--- a/services/keep-web/cache.go
+++ b/services/keep-web/cache.go
@@ -218,6 +218,7 @@ func (c *cache) GetSession(token string) (arvados.CustomFileSystem, error) {
 	now := time.Now()
 	ent, _ := c.sessions.Get(token)
 	sess, _ := ent.(*cachedSession)
+	expired := false
 	if sess == nil {
 		c.metrics.sessionMisses.Inc()
 		sess = &cachedSession{
@@ -226,13 +227,13 @@ func (c *cache) GetSession(token string) (arvados.CustomFileSystem, error) {
 		c.sessions.Add(token, sess)
 	} else if sess.expire.Before(now) {
 		c.metrics.sessionMisses.Inc()
-		sess.fs.Store(nil)
+		expired = true
 	} else {
 		c.metrics.sessionHits.Inc()
 	}
 	go c.pruneSessions()
 	fs, _ := sess.fs.Load().(arvados.CustomFileSystem)
-	if fs != nil {
+	if fs != nil && !expired {
 		return fs, nil
 	}
 	ac, err := arvados.NewClientFromConfig(c.cluster)

commit 47afa97d0e5220423c9101acb8ca69420e260e50
Author: Tom Clegg <tom at curii.com>
Date:   Mon Feb 22 21:46:27 2021 -0500

    16745: Prune enough sessions to reach size limit, not all.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/services/keep-web/cache.go b/services/keep-web/cache.go
index 71e995330..af143c77e 100644
--- a/services/keep-web/cache.go
+++ b/services/keep-web/cache.go
@@ -83,7 +83,7 @@ func (m *cacheMetrics) setup(reg *prometheus.Registry) {
 		Namespace: "arvados",
 		Subsystem: "keepweb_collectioncache",
 		Name:      "cached_manifest_bytes",
-		Help:      "Total size of all manifests in cache.",
+		Help:      "Total size of all cached manifests and sessions.",
 	})
 	reg.MustRegister(m.collectionBytes)
 	m.collectionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
@@ -251,10 +251,13 @@ func (c *cache) GetSession(token string) (arvados.CustomFileSystem, error) {
 	return fs, nil
 }
 
+// Remove all expired session cache entries, then remove more entries
+// until approximate remaining size <= maxsize/2
 func (c *cache) pruneSessions() {
 	now := time.Now()
 	var size int64
-	for _, token := range c.sessions.Keys() {
+	keys := c.sessions.Keys()
+	for _, token := range keys {
 		ent, ok := c.sessions.Peek(token)
 		if !ok {
 			continue
@@ -264,16 +267,28 @@ func (c *cache) pruneSessions() {
 			c.sessions.Remove(token)
 			continue
 		}
+		if fs, ok := s.fs.Load().(arvados.CustomFileSystem); ok {
+			size += fs.MemorySize()
+		}
+	}
+	// Remove tokens until reaching size limit, starting with the
+	// least frequently used entries (which Keys() returns last).
+	for i := len(keys) - 1; i >= 0; i-- {
+		token := keys[i]
+		if size <= c.cluster.Collections.WebDAVCache.MaxCollectionBytes/2 {
+			break
+		}
+		ent, ok := c.sessions.Peek(token)
+		if !ok {
+			continue
+		}
+		s := ent.(*cachedSession)
 		fs, _ := s.fs.Load().(arvados.CustomFileSystem)
 		if fs == nil {
 			continue
 		}
-		size += fs.MemorySize()
-	}
-	if size > c.cluster.Collections.WebDAVCache.MaxCollectionBytes/2 {
-		for _, token := range c.sessions.Keys() {
-			c.sessions.Remove(token)
-		}
+		c.sessions.Remove(token)
+		size -= fs.MemorySize()
 	}
 }
 

commit 0b714d15c15340f3f52c331528d5cbb4cc422f07
Author: Tom Clegg <tom at curii.com>
Date:   Mon Feb 22 11:10:10 2021 -0500

    16745: Keep a SiteFileSystem alive for multiple read requests.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml
index 3ecff5448..1d3fcb748 100644
--- a/lib/config/config.default.yml
+++ b/lib/config/config.default.yml
@@ -523,21 +523,30 @@ Clusters:
       TrustAllContent: false
 
       # Cache parameters for WebDAV content serving:
-      # * TTL: Maximum time to cache manifests and permission checks.
-      # * UUIDTTL: Maximum time to cache collection state.
-      # * MaxBlockEntries: Maximum number of block cache entries.
-      # * MaxCollectionEntries: Maximum number of collection cache entries.
-      # * MaxCollectionBytes: Approximate memory limit for collection cache.
-      # * MaxPermissionEntries: Maximum number of permission cache entries.
-      # * MaxUUIDEntries: Maximum number of UUID cache entries.
       WebDAVCache:
+        # Time to cache manifests, permission checks, and sessions.
         TTL: 300s
+
+        # Time to cache collection state.
         UUIDTTL: 5s
-        MaxBlockEntries:      4
+
+        # Block cache entries. Each block consumes up to 64 MiB RAM.
+        MaxBlockEntries: 4
+
+        # Collection cache entries.
         MaxCollectionEntries: 1000
-        MaxCollectionBytes:   100000000
+
+        # Approximate memory limit (in bytes) for collection cache.
+        MaxCollectionBytes: 100000000
+
+        # Permission cache entries.
         MaxPermissionEntries: 1000
-        MaxUUIDEntries:       1000
+
+        # UUID cache entries.
+        MaxUUIDEntries: 1000
+
+        # Persistent sessions.
+        MaxSessions: 100
 
     Login:
       # One of the following mechanisms (SSO, Google, PAM, LDAP, or
diff --git a/lib/config/generated_config.go b/lib/config/generated_config.go
index cd752ab75..949a842c1 100644
--- a/lib/config/generated_config.go
+++ b/lib/config/generated_config.go
@@ -529,21 +529,30 @@ Clusters:
       TrustAllContent: false
 
       # Cache parameters for WebDAV content serving:
-      # * TTL: Maximum time to cache manifests and permission checks.
-      # * UUIDTTL: Maximum time to cache collection state.
-      # * MaxBlockEntries: Maximum number of block cache entries.
-      # * MaxCollectionEntries: Maximum number of collection cache entries.
-      # * MaxCollectionBytes: Approximate memory limit for collection cache.
-      # * MaxPermissionEntries: Maximum number of permission cache entries.
-      # * MaxUUIDEntries: Maximum number of UUID cache entries.
       WebDAVCache:
+        # Time to cache manifests, permission checks, and sessions.
         TTL: 300s
+
+        # Time to cache collection state.
         UUIDTTL: 5s
-        MaxBlockEntries:      4
+
+        # Block cache entries. Each block consumes up to 64 MiB RAM.
+        MaxBlockEntries: 4
+
+        # Collection cache entries.
         MaxCollectionEntries: 1000
-        MaxCollectionBytes:   100000000
+
+        # Approximate memory limit (in bytes) for collection cache.
+        MaxCollectionBytes: 100000000
+
+        # Permission cache entries.
         MaxPermissionEntries: 1000
-        MaxUUIDEntries:       1000
+
+        # UUID cache entries.
+        MaxUUIDEntries: 1000
+
+        # Persistent sessions.
+        MaxSessions: 100
 
     Login:
       # One of the following mechanisms (SSO, Google, PAM, LDAP, or
diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go
index 2e39985c2..6fe7a022b 100644
--- a/sdk/go/arvados/config.go
+++ b/sdk/go/arvados/config.go
@@ -65,6 +65,7 @@ type WebDAVCacheConfig struct {
 	MaxCollectionBytes   int64
 	MaxPermissionEntries int
 	MaxUUIDEntries       int
+	MaxSessions          int
 }
 
 type Cluster struct {
diff --git a/sdk/go/arvados/fs_base.go b/sdk/go/arvados/fs_base.go
index aa75fee7c..2478641df 100644
--- a/sdk/go/arvados/fs_base.go
+++ b/sdk/go/arvados/fs_base.go
@@ -106,6 +106,9 @@ type FileSystem interface {
 	// path is "", flush all dirs/streams; otherwise, flush only
 	// the specified dir/stream.
 	Flush(path string, shortBlocks bool) error
+
+	// Estimate current memory usage.
+	MemorySize() int64
 }
 
 type inode interface {
@@ -156,6 +159,7 @@ type inode interface {
 	sync.Locker
 	RLock()
 	RUnlock()
+	MemorySize() int64
 }
 
 type fileinfo struct {
@@ -229,6 +233,13 @@ func (*nullnode) Child(name string, replace func(inode) (inode, error)) (inode,
 	return nil, ErrNotADirectory
 }
 
+func (*nullnode) MemorySize() int64 {
+	// Types that embed nullnode should report their own size, but
+	// if they don't, we at least report a non-zero size to ensure
+	// a large tree doesn't get reported as 0 bytes.
+	return 64
+}
+
 type treenode struct {
 	fs       FileSystem
 	parent   inode
@@ -319,6 +330,15 @@ func (n *treenode) Sync() error {
 	return nil
 }
 
+func (n *treenode) MemorySize() (size int64) {
+	n.RLock()
+	defer n.RUnlock()
+	for _, inode := range n.inodes {
+		size += inode.MemorySize()
+	}
+	return
+}
+
 type fileSystem struct {
 	root inode
 	fsBackend
@@ -607,6 +627,10 @@ func (fs *fileSystem) Flush(string, bool) error {
 	return ErrInvalidOperation
 }
 
+func (fs *fileSystem) MemorySize() int64 {
+	return fs.root.MemorySize()
+}
+
 // rlookup (recursive lookup) returns the inode for the file/directory
 // with the given name (which may contain "/" separators). If no such
 // file/directory exists, the returned node is nil.
diff --git a/sdk/go/arvados/fs_collection.go b/sdk/go/arvados/fs_collection.go
index 1de558a1b..0233826a7 100644
--- a/sdk/go/arvados/fs_collection.go
+++ b/sdk/go/arvados/fs_collection.go
@@ -38,9 +38,6 @@ type CollectionFileSystem interface {
 
 	// Total data bytes in all files.
 	Size() int64
-
-	// Memory consumed by buffered file data.
-	memorySize() int64
 }
 
 type collectionFileSystem struct {
@@ -232,10 +229,10 @@ func (fs *collectionFileSystem) Flush(path string, shortBlocks bool) error {
 	return dn.flush(context.TODO(), names, flushOpts{sync: false, shortBlocks: shortBlocks})
 }
 
-func (fs *collectionFileSystem) memorySize() int64 {
+func (fs *collectionFileSystem) MemorySize() int64 {
 	fs.fileSystem.root.Lock()
 	defer fs.fileSystem.root.Unlock()
-	return fs.fileSystem.root.(*dirnode).memorySize()
+	return fs.fileSystem.root.(*dirnode).MemorySize()
 }
 
 func (fs *collectionFileSystem) MarshalManifest(prefix string) (string, error) {
@@ -879,14 +876,14 @@ func (dn *dirnode) flush(ctx context.Context, names []string, opts flushOpts) er
 }
 
 // caller must have write lock.
-func (dn *dirnode) memorySize() (size int64) {
+func (dn *dirnode) MemorySize() (size int64) {
 	for _, name := range dn.sortedNames() {
 		node := dn.inodes[name]
 		node.Lock()
 		defer node.Unlock()
 		switch node := node.(type) {
 		case *dirnode:
-			size += node.memorySize()
+			size += node.MemorySize()
 		case *filenode:
 			for _, seg := range node.segments {
 				switch seg := seg.(type) {
diff --git a/sdk/go/arvados/fs_collection_test.go b/sdk/go/arvados/fs_collection_test.go
index 59a6a6ba8..05c8ea61a 100644
--- a/sdk/go/arvados/fs_collection_test.go
+++ b/sdk/go/arvados/fs_collection_test.go
@@ -1153,9 +1153,9 @@ func (s *CollectionFSSuite) TestFlushAll(c *check.C) {
 			fs.Flush("", true)
 		}
 
-		size := fs.memorySize()
+		size := fs.MemorySize()
 		if !c.Check(size <= 1<<24, check.Equals, true) {
-			c.Logf("at dir%d fs.memorySize()=%d", i, size)
+			c.Logf("at dir%d fs.MemorySize()=%d", i, size)
 			return
 		}
 	}
@@ -1188,13 +1188,13 @@ func (s *CollectionFSSuite) TestFlushFullBlocksOnly(c *check.C) {
 			c.Assert(err, check.IsNil)
 		}
 	}
-	c.Check(fs.memorySize(), check.Equals, int64(nDirs*67<<20))
+	c.Check(fs.MemorySize(), check.Equals, int64(nDirs*67<<20))
 	c.Check(flushed, check.Equals, int64(0))
 
 	waitForFlush := func(expectUnflushed, expectFlushed int64) {
-		for deadline := time.Now().Add(5 * time.Second); fs.memorySize() > expectUnflushed && time.Now().Before(deadline); time.Sleep(10 * time.Millisecond) {
+		for deadline := time.Now().Add(5 * time.Second); fs.MemorySize() > expectUnflushed && time.Now().Before(deadline); time.Sleep(10 * time.Millisecond) {
 		}
-		c.Check(fs.memorySize(), check.Equals, expectUnflushed)
+		c.Check(fs.MemorySize(), check.Equals, expectUnflushed)
 		c.Check(flushed, check.Equals, expectFlushed)
 	}
 
diff --git a/sdk/go/arvados/fs_deferred.go b/sdk/go/arvados/fs_deferred.go
index 254b90c81..bb6c7a262 100644
--- a/sdk/go/arvados/fs_deferred.go
+++ b/sdk/go/arvados/fs_deferred.go
@@ -112,3 +112,4 @@ func (dn *deferrednode) RLock()                          { dn.realinode().RLock(
 func (dn *deferrednode) RUnlock()                        { dn.realinode().RUnlock() }
 func (dn *deferrednode) FS() FileSystem                  { return dn.currentinode().FS() }
 func (dn *deferrednode) Parent() inode                   { return dn.currentinode().Parent() }
+func (dn *deferrednode) MemorySize() int64               { return dn.currentinode().MemorySize() }
diff --git a/services/keep-web/cache.go b/services/keep-web/cache.go
index eeb78ad90..71e995330 100644
--- a/services/keep-web/cache.go
+++ b/services/keep-web/cache.go
@@ -6,23 +6,27 @@ package main
 
 import (
 	"sync"
+	"sync/atomic"
 	"time"
 
 	"git.arvados.org/arvados.git/sdk/go/arvados"
 	"git.arvados.org/arvados.git/sdk/go/arvadosclient"
-	"github.com/hashicorp/golang-lru"
+	"git.arvados.org/arvados.git/sdk/go/keepclient"
+	lru "github.com/hashicorp/golang-lru"
 	"github.com/prometheus/client_golang/prometheus"
 )
 
 const metricsUpdateInterval = time.Second / 10
 
 type cache struct {
-	config      *arvados.WebDAVCacheConfig
+	cluster     *arvados.Cluster
+	config      *arvados.WebDAVCacheConfig // TODO: use cluster.Collections.WebDAV instead
 	registry    *prometheus.Registry
 	metrics     cacheMetrics
 	pdhs        *lru.TwoQueueCache
 	collections *lru.TwoQueueCache
 	permissions *lru.TwoQueueCache
+	sessions    *lru.TwoQueueCache
 	setupOnce   sync.Once
 }
 
@@ -30,9 +34,12 @@ type cacheMetrics struct {
 	requests          prometheus.Counter
 	collectionBytes   prometheus.Gauge
 	collectionEntries prometheus.Gauge
+	sessionEntries    prometheus.Gauge
 	collectionHits    prometheus.Counter
 	pdhHits           prometheus.Counter
 	permissionHits    prometheus.Counter
+	sessionHits       prometheus.Counter
+	sessionMisses     prometheus.Counter
 	apiCalls          prometheus.Counter
 }
 
@@ -86,6 +93,27 @@ func (m *cacheMetrics) setup(reg *prometheus.Registry) {
 		Help:      "Number of manifests in cache.",
 	})
 	reg.MustRegister(m.collectionEntries)
+	m.sessionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
+		Namespace: "arvados",
+		Subsystem: "keepweb_sessions",
+		Name:      "active",
+		Help:      "Number of active token sessions.",
+	})
+	reg.MustRegister(m.sessionEntries)
+	m.sessionHits = prometheus.NewCounter(prometheus.CounterOpts{
+		Namespace: "arvados",
+		Subsystem: "keepweb_sessions",
+		Name:      "hits",
+		Help:      "Number of token session cache hits.",
+	})
+	reg.MustRegister(m.sessionHits)
+	m.sessionMisses = prometheus.NewCounter(prometheus.CounterOpts{
+		Namespace: "arvados",
+		Subsystem: "keepweb_sessions",
+		Name:      "misses",
+		Help:      "Number of token session cache misses.",
+	})
+	reg.MustRegister(m.sessionMisses)
 }
 
 type cachedPDH struct {
@@ -102,6 +130,11 @@ type cachedPermission struct {
 	expire time.Time
 }
 
+type cachedSession struct {
+	expire time.Time
+	fs     atomic.Value
+}
+
 func (c *cache) setup() {
 	var err error
 	c.pdhs, err = lru.New2Q(c.config.MaxUUIDEntries)
@@ -116,6 +149,10 @@ func (c *cache) setup() {
 	if err != nil {
 		panic(err)
 	}
+	c.sessions, err = lru.New2Q(c.config.MaxSessions)
+	if err != nil {
+		panic(err)
+	}
 
 	reg := c.registry
 	if reg == nil {
@@ -132,6 +169,7 @@ func (c *cache) setup() {
 func (c *cache) updateGauges() {
 	c.metrics.collectionBytes.Set(float64(c.collectionBytes()))
 	c.metrics.collectionEntries.Set(float64(c.collections.Len()))
+	c.metrics.sessionEntries.Set(float64(c.sessions.Len()))
 }
 
 var selectPDH = map[string]interface{}{
@@ -165,6 +203,80 @@ func (c *cache) Update(client *arvados.Client, coll arvados.Collection, fs arvad
 	return err
 }
 
+// ResetSession unloads any potentially stale state. Should be called
+// after write operations, so subsequent reads don't return stale
+// data.
+func (c *cache) ResetSession(token string) {
+	c.setupOnce.Do(c.setup)
+	c.sessions.Remove(token)
+}
+
+// Get a long-lived CustomFileSystem suitable for doing a read operation
+// with the given token.
+func (c *cache) GetSession(token string) (arvados.CustomFileSystem, error) {
+	c.setupOnce.Do(c.setup)
+	now := time.Now()
+	ent, _ := c.sessions.Get(token)
+	sess, _ := ent.(*cachedSession)
+	if sess == nil {
+		c.metrics.sessionMisses.Inc()
+		sess = &cachedSession{
+			expire: now.Add(c.config.TTL.Duration()),
+		}
+		c.sessions.Add(token, sess)
+	} else if sess.expire.Before(now) {
+		c.metrics.sessionMisses.Inc()
+		sess.fs.Store(nil)
+	} else {
+		c.metrics.sessionHits.Inc()
+	}
+	go c.pruneSessions()
+	fs, _ := sess.fs.Load().(arvados.CustomFileSystem)
+	if fs != nil {
+		return fs, nil
+	}
+	ac, err := arvados.NewClientFromConfig(c.cluster)
+	if err != nil {
+		return nil, err
+	}
+	ac.AuthToken = token
+	arv, err := arvadosclient.New(ac)
+	if err != nil {
+		return nil, err
+	}
+	kc := keepclient.New(arv)
+	fs = ac.SiteFileSystem(kc)
+	fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution)
+	sess.fs.Store(fs)
+	return fs, nil
+}
+
+func (c *cache) pruneSessions() {
+	now := time.Now()
+	var size int64
+	for _, token := range c.sessions.Keys() {
+		ent, ok := c.sessions.Peek(token)
+		if !ok {
+			continue
+		}
+		s := ent.(*cachedSession)
+		if s.expire.Before(now) {
+			c.sessions.Remove(token)
+			continue
+		}
+		fs, _ := s.fs.Load().(arvados.CustomFileSystem)
+		if fs == nil {
+			continue
+		}
+		size += fs.MemorySize()
+	}
+	if size > c.cluster.Collections.WebDAVCache.MaxCollectionBytes/2 {
+		for _, token := range c.sessions.Keys() {
+			c.sessions.Remove(token)
+		}
+	}
+}
+
 func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) {
 	c.setupOnce.Do(c.setup)
 	c.metrics.requests.Inc()
@@ -288,7 +400,7 @@ func (c *cache) pruneCollections() {
 		}
 	}
 	for i, k := range keys {
-		if size <= c.config.MaxCollectionBytes {
+		if size <= c.config.MaxCollectionBytes/2 {
 			break
 		}
 		if expired[i] {
@@ -300,8 +412,8 @@ func (c *cache) pruneCollections() {
 	}
 }
 
-// collectionBytes returns the approximate memory size of the
-// collection cache.
+// collectionBytes returns the approximate combined memory size of the
+// collection cache and session filesystem cache.
 func (c *cache) collectionBytes() uint64 {
 	var size uint64
 	for _, k := range c.collections.Keys() {
@@ -311,6 +423,15 @@ func (c *cache) collectionBytes() uint64 {
 		}
 		size += uint64(len(v.(*cachedCollection).collection.ManifestText))
 	}
+	for _, token := range c.sessions.Keys() {
+		ent, ok := c.sessions.Peek(token)
+		if !ok {
+			continue
+		}
+		if fs, ok := ent.(*cachedSession).fs.Load().(arvados.CustomFileSystem); ok {
+			size += uint64(fs.MemorySize())
+		}
+	}
 	return size
 }
 
diff --git a/services/keep-web/handler.go b/services/keep-web/handler.go
index 2d6fb78f8..4ea2fa2f6 100644
--- a/services/keep-web/handler.go
+++ b/services/keep-web/handler.go
@@ -520,7 +520,8 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
 func (h *handler) getClients(reqID, token string) (arv *arvadosclient.ArvadosClient, kc *keepclient.KeepClient, client *arvados.Client, release func(), err error) {
 	arv = h.clientPool.Get()
 	if arv == nil {
-		return nil, nil, nil, nil, err
+		err = h.clientPool.Err()
+		return
 	}
 	release = func() { h.clientPool.Put(arv) }
 	arv.ApiToken = token
@@ -548,14 +549,11 @@ func (h *handler) serveSiteFS(w http.ResponseWriter, r *http.Request, tokens []s
 		http.Error(w, errReadOnly.Error(), http.StatusMethodNotAllowed)
 		return
 	}
-	_, kc, client, release, err := h.getClients(r.Header.Get("X-Request-Id"), tokens[0])
+	fs, err := h.Config.Cache.GetSession(tokens[0])
 	if err != nil {
-		http.Error(w, "Pool failed: "+h.clientPool.Err().Error(), http.StatusInternalServerError)
+		http.Error(w, err.Error(), http.StatusInternalServerError)
 		return
 	}
-	defer release()
-
-	fs := client.SiteFileSystem(kc)
 	fs.ForwardSlashNameSubstitution(h.Config.cluster.Collections.ForwardSlashNameSubstitution)
 	f, err := fs.Open(r.URL.Path)
 	if os.IsNotExist(err) {
diff --git a/services/keep-web/main.go b/services/keep-web/main.go
index 647eab165..a62e0abb6 100644
--- a/services/keep-web/main.go
+++ b/services/keep-web/main.go
@@ -38,6 +38,7 @@ func newConfig(arvCfg *arvados.Config) *Config {
 	}
 	cfg.cluster = cls
 	cfg.Cache.config = &cfg.cluster.Collections.WebDAVCache
+	cfg.Cache.cluster = cls
 	return &cfg
 }
 
diff --git a/services/keep-web/s3.go b/services/keep-web/s3.go
index 66f89db9e..3e82c5867 100644
--- a/services/keep-web/s3.go
+++ b/services/keep-web/s3.go
@@ -228,15 +228,29 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
 		return false
 	}
 
-	_, kc, client, release, err := h.getClients(r.Header.Get("X-Request-Id"), token)
-	if err != nil {
-		s3ErrorResponse(w, InternalError, "Pool failed: "+h.clientPool.Err().Error(), r.URL.Path, http.StatusInternalServerError)
-		return true
+	var err error
+	var fs arvados.CustomFileSystem
+	if r.Method == http.MethodGet || r.Method == http.MethodHead {
+		// Use a single session (cached FileSystem) across
+		// multiple read requests.
+		fs, err = h.Config.Cache.GetSession(token)
+		if err != nil {
+			s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError)
+			return true
+		}
+	} else {
+		// Create a FileSystem for this request, to avoid
+		// exposing incomplete write operations to concurrent
+		// requests.
+		_, kc, client, release, err := h.getClients(r.Header.Get("X-Request-Id"), token)
+		if err != nil {
+			s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError)
+			return true
+		}
+		defer release()
+		fs = client.SiteFileSystem(kc)
+		fs.ForwardSlashNameSubstitution(h.Config.cluster.Collections.ForwardSlashNameSubstitution)
 	}
-	defer release()
-
-	fs := client.SiteFileSystem(kc)
-	fs.ForwardSlashNameSubstitution(h.Config.cluster.Collections.ForwardSlashNameSubstitution)
 
 	var objectNameGiven bool
 	var bucketName string
@@ -385,6 +399,8 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
 			s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError)
 			return true
 		}
+		// Ensure a subsequent read operation will see the changes.
+		h.Config.Cache.ResetSession(token)
 		w.WriteHeader(http.StatusOK)
 		return true
 	case r.Method == http.MethodDelete:
@@ -432,11 +448,12 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
 			s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError)
 			return true
 		}
+		// Ensure a subsequent read operation will see the changes.
+		h.Config.Cache.ResetSession(token)
 		w.WriteHeader(http.StatusNoContent)
 		return true
 	default:
 		s3ErrorResponse(w, InvalidRequest, "method not allowed", r.URL.Path, http.StatusMethodNotAllowed)
-
 		return true
 	}
 }

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list