[arvados] created: 2.1.0-2865-g6b0d4ac8d
git repository hosting
git at public.arvados.org
Mon Sep 12 18:50:02 UTC 2022
at 6b0d4ac8df4b5b4255eb56b1d76865f06089ca2a (commit)
commit 6b0d4ac8df4b5b4255eb56b1d76865f06089ca2a
Author: Tom Clegg <tom at curii.com>
Date: Mon Sep 12 14:49:37 2022 -0400
19362: Move non-S3 collection-addressed reqs to sitefs code path.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/sdk/go/arvados/fs_base.go b/sdk/go/arvados/fs_base.go
index 2ad4d1f85..b0096140c 100644
--- a/sdk/go/arvados/fs_base.go
+++ b/sdk/go/arvados/fs_base.go
@@ -631,7 +631,15 @@ func (fs *fileSystem) Rename(oldname, newname string) error {
}
locked := map[sync.Locker]bool{}
for i := len(needLock) - 1; i >= 0; i-- {
- if n := needLock[i]; !locked[n] {
+ n := needLock[i]
+ if fs, ok := n.(FileSystem); ok {
+ // Lock the fs's root dir directly, not
+ // through the fs. Otherwise our "locked" map
+ // would not reliably prevent double-locking
+ // the fs's root dir.
+ n = fs.rootnode()
+ }
+ if !locked[n] {
n.Lock()
defer n.Unlock()
locked[n] = true
diff --git a/sdk/go/arvados/fs_collection.go b/sdk/go/arvados/fs_collection.go
index 26012e240..256f513cf 100644
--- a/sdk/go/arvados/fs_collection.go
+++ b/sdk/go/arvados/fs_collection.go
@@ -290,44 +290,70 @@ func (fs *collectionFileSystem) Truncate(int64) error {
return ErrInvalidOperation
}
-// Check for and incorporate upstream changes -- unless that has
-// already been done recently, in which case this func is a no-op.
-func (fs *collectionFileSystem) checkChangesOnServer() error {
+// Check for and incorporate upstream changes. If force==false, this
+// is a no-op except once every ttl/100 or so.
+//
+// Return value is true if new content was loaded from upstream and
+// any unsaved local changes have been discarded.
+func (fs *collectionFileSystem) checkChangesOnServer(force bool) (bool, error) {
if fs.uuid == "" && fs.savedPDH.Load() == "" {
- return nil
+ return false, nil
}
- // First try UUID if any, then last known PDH. Stop if all
- // signatures are new enough.
- checkingAll := false
- for _, id := range []string{fs.uuid, fs.savedPDH.Load().(string)} {
- if id == "" {
- continue
- }
-
- fs.lockCheckChanges.Lock()
- if !checkingAll && fs.holdCheckChanges.After(time.Now()) {
- fs.lockCheckChanges.Unlock()
- return nil
- }
- remain, ttl := fs.signatureTimeLeft()
- if remain > 0.01 && !checkingAll {
- fs.holdCheckChanges = time.Now().Add(ttl / 100)
- }
+ fs.lockCheckChanges.Lock()
+ if !force && fs.holdCheckChanges.After(time.Now()) {
fs.lockCheckChanges.Unlock()
+ return false, nil
+ }
+ remain, ttl := fs.signatureTimeLeft()
+ if remain > 0.01 {
+ fs.holdCheckChanges = time.Now().Add(ttl / 100)
+ }
+ fs.lockCheckChanges.Unlock()
- if remain >= 0.5 {
- break
+ if !force && remain >= 0.5 {
+ // plenty of time left on current signatures
+ return false, nil
+ }
+
+ getparams := map[string]interface{}{"select": []string{"portable_data_hash", "manifest_text"}}
+ if fs.uuid != "" {
+ var coll Collection
+ err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+fs.uuid, nil, getparams)
+ if err != nil {
+ return false, err
+ }
+ if coll.PortableDataHash != fs.savedPDH.Load().(string) {
+ // collection has changed upstream since we
+ // last loaded or saved. Refresh local data,
+ // losing any unsaved local changes.
+ newfs, err := coll.FileSystem(fs.fileSystem.fsBackend, fs.fileSystem.fsBackend)
+ if err != nil {
+ return false, err
+ }
+ snap, err := Snapshot(newfs, "/")
+ if err != nil {
+ return false, err
+ }
+ err = Splice(fs, "/", snap)
+ if err != nil {
+ return false, err
+ }
+ fs.savedPDH.Store(coll.PortableDataHash)
+ return true, nil
}
- checkingAll = true
+ fs.updateSignatures(coll.ManifestText)
+ return false, nil
+ }
+ if pdh := fs.savedPDH.Load().(string); pdh != "" {
var coll Collection
- err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+id, nil, map[string]interface{}{"select": []string{"portable_data_hash", "manifest_text"}})
+ err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+pdh, nil, getparams)
if err != nil {
- continue
+ return false, err
}
fs.updateSignatures(coll.ManifestText)
}
- return nil
+ return false, nil
}
// Refresh signature on a single locator, if necessary. Assume caller
@@ -339,7 +365,7 @@ func (fs *collectionFileSystem) refreshSignature(locator string) string {
if err != nil || exp.Sub(time.Now()) > time.Minute {
// Synchronous update is not needed. Start an
// asynchronous update if needed.
- go fs.checkChangesOnServer()
+ go fs.checkChangesOnServer(false)
return locator
}
var manifests string
@@ -368,11 +394,11 @@ func (fs *collectionFileSystem) refreshSignature(locator string) string {
}
func (fs *collectionFileSystem) Sync() error {
- err := fs.checkChangesOnServer()
+ refreshed, err := fs.checkChangesOnServer(true)
if err != nil {
return err
}
- if fs.uuid == "" {
+ if refreshed || fs.uuid == "" {
return nil
}
txt, err := fs.MarshalManifest(".")
@@ -403,7 +429,7 @@ func (fs *collectionFileSystem) Sync() error {
"select": selectFields,
})
if err != nil {
- return fmt.Errorf("sync failed: update %s: %s", fs.uuid, err)
+ return fmt.Errorf("sync failed: update %s: %w", fs.uuid, err)
}
fs.updateSignatures(coll.ManifestText)
fs.savedPDH.Store(coll.PortableDataHash)
@@ -1163,10 +1189,11 @@ func (dn *dirnode) MemorySize() (size int64) {
size += 64
for _, seg := range node.segments {
switch seg := seg.(type) {
+ case storedSegment:
+ size += int64(len(seg.locator)) + 40
case *memSegment:
- size += int64(seg.Len())
+ size += int64(seg.Len()) + 8
}
- size += 64
}
}
}
diff --git a/sdk/go/arvados/fs_collection_test.go b/sdk/go/arvados/fs_collection_test.go
index c2cac3c6c..da3166509 100644
--- a/sdk/go/arvados/fs_collection_test.go
+++ b/sdk/go/arvados/fs_collection_test.go
@@ -1221,8 +1221,8 @@ func (s *CollectionFSSuite) TestFlushFullBlocksOnly(c *check.C) {
c.Assert(err, check.IsNil)
}
}
- inodebytes := int64((nDirs*(67*2+1) + 1) * 64)
- c.Check(fs.MemorySize(), check.Equals, int64(nDirs*67<<20)+inodebytes)
+ inodebytes := int64((nDirs*(67+1) + 1) * 64)
+ c.Check(fs.MemorySize(), check.Equals, int64(nDirs*67*(1<<20+8))+inodebytes)
c.Check(flushed, check.Equals, int64(0))
waitForFlush := func(expectUnflushed, expectFlushed int64) {
@@ -1233,27 +1233,29 @@ func (s *CollectionFSSuite) TestFlushFullBlocksOnly(c *check.C) {
}
// Nothing flushed yet
- waitForFlush((nDirs*67)<<20+inodebytes, 0)
+ waitForFlush(nDirs*67*(1<<20+8)+inodebytes, 0)
// Flushing a non-empty dir "/" is non-recursive and there are
// no top-level files, so this has no effect
fs.Flush("/", false)
- waitForFlush((nDirs*67)<<20+inodebytes, 0)
+ waitForFlush(nDirs*67*(1<<20+8)+inodebytes, 0)
// Flush the full block in dir0
fs.Flush("dir0", false)
- waitForFlush((nDirs*67-64)<<20+inodebytes, 64<<20)
+ bigloclen := int64(32 + 9 + 51 + 40) // md5 + "+" + "67xxxxxx" + "+Axxxxxx..." + 40 (see (*dirnode)MemorySize)
+ waitForFlush((nDirs*67-64)*(1<<20+8)+inodebytes+bigloclen*64, 64<<20)
err = fs.Flush("dir-does-not-exist", false)
c.Check(err, check.NotNil)
// Flush full blocks in all dirs
fs.Flush("", false)
- waitForFlush(nDirs*3<<20+inodebytes, nDirs*64<<20)
+ waitForFlush(nDirs*3*(1<<20+8)+inodebytes+bigloclen*64*nDirs, nDirs*64<<20)
// Flush non-full blocks, too
fs.Flush("", true)
- waitForFlush(inodebytes, nDirs*67<<20)
+ smallloclen := int64(32 + 8 + 51 + 40) // md5 + "+" + "3xxxxxx" + "+Axxxxxx..." + 40 (see (*dirnode)MemorySize)
+ waitForFlush(inodebytes+bigloclen*64*nDirs+smallloclen*3*nDirs, nDirs*67<<20)
}
// Even when writing lots of files/dirs from different goroutines, as
diff --git a/sdk/go/arvados/fs_site.go b/sdk/go/arvados/fs_site.go
index bb2eee779..eed49296e 100644
--- a/sdk/go/arvados/fs_site.go
+++ b/sdk/go/arvados/fs_site.go
@@ -5,6 +5,7 @@
package arvados
import (
+ "net/http"
"os"
"strings"
"sync"
@@ -136,29 +137,43 @@ func (fs *customFileSystem) newNode(name string, perm os.FileMode, modTime time.
return nil, ErrInvalidOperation
}
-func (fs *customFileSystem) mountByID(parent inode, id string) inode {
+func (fs *customFileSystem) mountByID(parent inode, id string) (inode, error) {
if strings.Contains(id, "-4zz18-") || pdhRegexp.MatchString(id) {
return fs.mountCollection(parent, id)
} else if strings.Contains(id, "-j7d0g-") {
- return fs.newProjectNode(fs.root, id, id, nil)
+ return fs.newProjectNode(fs.root, id, id, nil), nil
} else {
- return nil
+ return nil, nil
}
}
-func (fs *customFileSystem) mountCollection(parent inode, id string) inode {
+func (fs *customFileSystem) mountCollection(parent inode, id string) (inode, error) {
var coll Collection
err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+id, nil, nil)
- if err != nil {
- return nil
+ if statusErr, ok := err.(interface{ HTTPStatus() int }); ok && statusErr.HTTPStatus() == http.StatusNotFound {
+ return nil, nil
+ } else if err != nil {
+ return nil, err
+ }
+ if len(id) != 27 {
+ // This means id is a PDH, and controller/railsapi
+ // returned one of (possibly) many collections with
+ // that PDH. We don't want to expose (e.g., through
+ // Sys()) the metadata from that collection -- only
+ // the fields that are common to all matching
+ // collections, i.e., PDH and manifest.
+ coll = Collection{
+ PortableDataHash: coll.PortableDataHash,
+ ManifestText: coll.ManifestText,
+ }
}
newfs, err := coll.FileSystem(fs, fs)
if err != nil {
- return nil
+ return nil, err
}
cfs := newfs.(*collectionFileSystem)
cfs.SetParent(parent, id)
- return cfs
+ return cfs, nil
}
func (fs *customFileSystem) newProjectNode(root inode, name, uuid string, proj *Group) inode {
@@ -202,15 +217,19 @@ func (fs *customFileSystem) newProjectNode(root inode, name, uuid string, proj *
// treenode, or nil for ENOENT.
type vdirnode struct {
treenode
- create func(parent inode, name string) inode
+ create func(parent inode, name string) (inode, error)
}
func (vn *vdirnode) Child(name string, replace func(inode) (inode, error)) (inode, error) {
return vn.treenode.Child(name, func(existing inode) (inode, error) {
if existing == nil && vn.create != nil {
- existing = vn.create(vn, name)
- if existing != nil {
- existing.SetParent(vn, name)
+ newnode, err := vn.create(vn, name)
+ if err != nil {
+ return nil, err
+ }
+ if newnode != nil {
+ newnode.SetParent(vn, name)
+ existing = newnode
vn.treenode.fileinfo.modTime = time.Now()
}
}
diff --git a/sdk/go/arvadostest/fixtures.go b/sdk/go/arvadostest/fixtures.go
index ec5572541..ac12f7ae1 100644
--- a/sdk/go/arvadostest/fixtures.go
+++ b/sdk/go/arvadostest/fixtures.go
@@ -32,6 +32,7 @@ const (
HelloWorldPdh = "55713e6a34081eb03609e7ad5fcad129+62"
MultilevelCollection1 = "zzzzz-4zz18-pyw8yp9g3pr7irn"
+ MultilevelCollection1PDH = "f9ddda46bb293b6847da984e3aa735db+290"
StorageClassesDesiredDefaultConfirmedDefault = "zzzzz-4zz18-3t236wr12769tga"
StorageClassesDesiredArchiveConfirmedDefault = "zzzzz-4zz18-3t236wr12769qqa"
EmptyCollectionUUID = "zzzzz-4zz18-gs9ooj1h9sd5mde"
@@ -83,8 +84,11 @@ const (
Repository2UUID = "zzzzz-s0uqq-382brsig8rp3667"
Repository2Name = "active/foo2"
- FooCollectionSharingTokenUUID = "zzzzz-gj3su-gf02tdm4g1z3e3u"
- FooCollectionSharingToken = "iknqgmunrhgsyfok8uzjlwun9iscwm3xacmzmg65fa1j1lpdss"
+ FooFileCollectionUUID = "zzzzz-4zz18-znfnqtbbv4spc3w"
+ FooFileCollectionSharingTokenUUID = "zzzzz-gj3su-gf02tdm4g1z3e3u"
+ FooFileCollectionSharingToken = "iknqgmunrhgsyfok8uzjlwun9iscwm3xacmzmg65fa1j1lpdss"
+ BarFileCollectionUUID = "zzzzz-4zz18-ehbhgtheo8909or"
+ BarFileCollectionPDH = "fa7aeb5140e2848d39b416daeef4ffc5+45"
WorkflowWithDefinitionYAMLUUID = "zzzzz-7fd4e-validworkfloyml"
diff --git a/services/keep-web/cache.go b/services/keep-web/cache.go
index d5fdc4997..3ba3e60ab 100644
--- a/services/keep-web/cache.go
+++ b/services/keep-web/cache.go
@@ -5,6 +5,7 @@
package keepweb
import (
+ "net/http"
"sync"
"sync/atomic"
"time"
@@ -20,74 +21,32 @@ import (
const metricsUpdateInterval = time.Second / 10
type cache struct {
- cluster *arvados.Cluster
- logger logrus.FieldLogger
- registry *prometheus.Registry
- metrics cacheMetrics
- pdhs *lru.TwoQueueCache
- collections *lru.TwoQueueCache
- sessions *lru.TwoQueueCache
- setupOnce sync.Once
+ cluster *arvados.Cluster
+ logger logrus.FieldLogger
+ registry *prometheus.Registry
+ metrics cacheMetrics
+ sessions *lru.TwoQueueCache
+ setupOnce sync.Once
- chPruneSessions chan struct{}
- chPruneCollections chan struct{}
+ chPruneSessions chan struct{}
}
type cacheMetrics struct {
- requests prometheus.Counter
- collectionBytes prometheus.Gauge
- collectionEntries prometheus.Gauge
- sessionEntries prometheus.Gauge
- collectionHits prometheus.Counter
- pdhHits prometheus.Counter
- sessionHits prometheus.Counter
- sessionMisses prometheus.Counter
- apiCalls prometheus.Counter
+ requests prometheus.Counter
+ collectionBytes prometheus.Gauge
+ sessionEntries prometheus.Gauge
+ sessionHits prometheus.Counter
+ sessionMisses prometheus.Counter
}
func (m *cacheMetrics) setup(reg *prometheus.Registry) {
- m.requests = prometheus.NewCounter(prometheus.CounterOpts{
- Namespace: "arvados",
- Subsystem: "keepweb_collectioncache",
- Name: "requests",
- Help: "Number of targetID-to-manifest lookups handled.",
- })
- reg.MustRegister(m.requests)
- m.collectionHits = prometheus.NewCounter(prometheus.CounterOpts{
- Namespace: "arvados",
- Subsystem: "keepweb_collectioncache",
- Name: "hits",
- Help: "Number of pdh-to-manifest cache hits.",
- })
- reg.MustRegister(m.collectionHits)
- m.pdhHits = prometheus.NewCounter(prometheus.CounterOpts{
- Namespace: "arvados",
- Subsystem: "keepweb_collectioncache",
- Name: "pdh_hits",
- Help: "Number of uuid-to-pdh cache hits.",
- })
- reg.MustRegister(m.pdhHits)
- m.apiCalls = prometheus.NewCounter(prometheus.CounterOpts{
- Namespace: "arvados",
- Subsystem: "keepweb_collectioncache",
- Name: "api_calls",
- Help: "Number of outgoing API calls made by cache.",
- })
- reg.MustRegister(m.apiCalls)
m.collectionBytes = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "arvados",
Subsystem: "keepweb_sessions",
- Name: "cached_collection_bytes",
- Help: "Total size of all cached manifests and sessions.",
+ Name: "cached_session_bytes",
+ Help: "Total size of all cached sessions.",
})
reg.MustRegister(m.collectionBytes)
- m.collectionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
- Namespace: "arvados",
- Subsystem: "keepweb_collectioncache",
- Name: "cached_manifests",
- Help: "Number of manifests in cache.",
- })
- reg.MustRegister(m.collectionEntries)
m.sessionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "arvados",
Subsystem: "keepweb_sessions",
@@ -111,21 +70,6 @@ func (m *cacheMetrics) setup(reg *prometheus.Registry) {
reg.MustRegister(m.sessionMisses)
}
-type cachedPDH struct {
- expire time.Time
- refresh time.Time
- pdh string
-}
-
-type cachedCollection struct {
- expire time.Time
- collection *arvados.Collection
-}
-
-type cachedPermission struct {
- expire time.Time
-}
-
type cachedSession struct {
expire time.Time
fs atomic.Value
@@ -137,14 +81,6 @@ type cachedSession struct {
func (c *cache) setup() {
var err error
- c.pdhs, err = lru.New2Q(c.cluster.Collections.WebDAVCache.MaxUUIDEntries)
- if err != nil {
- panic(err)
- }
- c.collections, err = lru.New2Q(c.cluster.Collections.WebDAVCache.MaxCollectionEntries)
- if err != nil {
- panic(err)
- }
c.sessions, err = lru.New2Q(c.cluster.Collections.WebDAVCache.MaxSessions)
if err != nil {
panic(err)
@@ -160,12 +96,6 @@ func (c *cache) setup() {
c.updateGauges()
}
}()
- c.chPruneCollections = make(chan struct{}, 1)
- go func() {
- for range c.chPruneCollections {
- c.pruneCollections()
- }
- }()
c.chPruneSessions = make(chan struct{}, 1)
go func() {
for range c.chPruneSessions {
@@ -176,7 +106,6 @@ func (c *cache) setup() {
func (c *cache) updateGauges() {
c.metrics.collectionBytes.Set(float64(c.collectionBytes()))
- c.metrics.collectionEntries.Set(float64(c.collections.Len()))
c.metrics.sessionEntries.Set(float64(c.sessions.Len()))
}
@@ -184,39 +113,6 @@ var selectPDH = map[string]interface{}{
"select": []string{"portable_data_hash"},
}
-// Update saves a modified version (fs) to an existing collection
-// (coll) and, if successful, updates the relevant cache entries so
-// subsequent calls to Get() reflect the modifications.
-func (c *cache) Update(client *arvados.Client, coll arvados.Collection, fs arvados.CollectionFileSystem) error {
- c.setupOnce.Do(c.setup)
-
- m, err := fs.MarshalManifest(".")
- if err != nil || m == coll.ManifestText {
- return err
- }
- coll.ManifestText = m
- var updated arvados.Collection
- err = client.RequestAndDecode(&updated, "PATCH", "arvados/v1/collections/"+coll.UUID, nil, map[string]interface{}{
- "collection": map[string]string{
- "manifest_text": coll.ManifestText,
- },
- })
- if err != nil {
- c.pdhs.Remove(coll.UUID)
- return err
- }
- c.collections.Add(client.AuthToken+"\000"+updated.PortableDataHash, &cachedCollection{
- expire: time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.TTL)),
- collection: &updated,
- })
- c.pdhs.Add(coll.UUID, &cachedPDH{
- expire: time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.TTL)),
- refresh: time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.UUIDTTL)),
- pdh: updated.PortableDataHash,
- })
- return nil
-}
-
// ResetSession unloads any potentially stale state. Should be called
// after write operations, so subsequent reads don't return stale
// data.
@@ -227,7 +123,7 @@ func (c *cache) ResetSession(token string) {
// Get a long-lived CustomFileSystem suitable for doing a read operation
// with the given token.
-func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSession, error) {
+func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSession, *arvados.User, error) {
c.setupOnce.Do(c.setup)
now := time.Now()
ent, _ := c.sessions.Get(token)
@@ -241,12 +137,12 @@ func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSessi
var err error
sess.client, err = arvados.NewClientFromConfig(c.cluster)
if err != nil {
- return nil, nil, err
+ return nil, nil, nil, err
}
sess.client.AuthToken = token
sess.arvadosclient, err = arvadosclient.New(sess.client)
if err != nil {
- return nil, nil, err
+ return nil, nil, nil, err
}
sess.keepclient = keepclient.New(sess.arvadosclient)
c.sessions.Add(token, sess)
@@ -260,14 +156,29 @@ func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSessi
case c.chPruneSessions <- struct{}{}:
default:
}
+
fs, _ := sess.fs.Load().(arvados.CustomFileSystem)
- if fs != nil && !expired {
- return fs, sess, nil
+ if fs == nil || expired {
+ fs = sess.client.SiteFileSystem(sess.keepclient)
+ fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution)
+ sess.fs.Store(fs)
}
- fs = sess.client.SiteFileSystem(sess.keepclient)
- fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution)
- sess.fs.Store(fs)
- return fs, sess, nil
+
+ user, _ := sess.user.Load().(*arvados.User)
+ if user == nil || expired {
+ user = new(arvados.User)
+ err := sess.client.RequestAndDecode(user, "GET", "/arvados/v1/users/current", nil, nil)
+ if statusErr, ok := err.(interface{ HTTPStatus() int }); ok && statusErr.HTTPStatus() == http.StatusForbidden {
+ // token is OK, but "get user id" api is out
+ // of scope -- return nil, signifying unknown
+ // user
+ } else if err != nil {
+ return nil, nil, nil, err
+ }
+ sess.user.Store(user)
+ }
+
+ return fs, sess, user, nil
}
// Remove all expired session cache entries, then remove more entries
@@ -294,7 +205,7 @@ func (c *cache) pruneSessions() {
// least frequently used entries (which Keys() returns last).
for i := len(keys) - 1; i >= 0; i-- {
token := keys[i]
- if size <= c.cluster.Collections.WebDAVCache.MaxCollectionBytes/2 {
+ if size <= c.cluster.Collections.WebDAVCache.MaxCollectionBytes {
break
}
ent, ok := c.sessions.Peek(token)
@@ -311,147 +222,10 @@ func (c *cache) pruneSessions() {
}
}
-func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) {
- c.setupOnce.Do(c.setup)
- c.metrics.requests.Inc()
-
- var pdhRefresh bool
- var pdh string
- if arvadosclient.PDHMatch(targetID) {
- pdh = targetID
- } else if ent, cached := c.pdhs.Get(targetID); cached {
- ent := ent.(*cachedPDH)
- if ent.expire.Before(time.Now()) {
- c.pdhs.Remove(targetID)
- } else {
- pdh = ent.pdh
- pdhRefresh = forceReload || time.Now().After(ent.refresh)
- c.metrics.pdhHits.Inc()
- }
- }
-
- if pdh == "" {
- // UUID->PDH mapping is not cached, might as well get
- // the whole collection record and be done (below).
- c.logger.Debugf("cache(%s): have no pdh", targetID)
- } else if cached := c.lookupCollection(arv.ApiToken + "\000" + pdh); cached == nil {
- // PDH->manifest is not cached, might as well get the
- // whole collection record (below).
- c.logger.Debugf("cache(%s): have pdh %s but manifest is not cached", targetID, pdh)
- } else if !pdhRefresh {
- // We looked up UUID->PDH very recently, and we still
- // have the manifest for that PDH.
- c.logger.Debugf("cache(%s): have pdh %s and refresh not needed", targetID, pdh)
- return cached, nil
- } else {
- // Get current PDH for this UUID (and confirm we still
- // have read permission). Most likely, the cached PDH
- // is still correct, in which case we can use our
- // cached manifest.
- c.metrics.apiCalls.Inc()
- var current arvados.Collection
- err := arv.Get("collections", targetID, selectPDH, ¤t)
- if err != nil {
- return nil, err
- }
- if current.PortableDataHash == pdh {
- // PDH has not changed, cached manifest is
- // correct.
- c.logger.Debugf("cache(%s): verified cached pdh %s is still correct", targetID, pdh)
- return cached, nil
- }
- if cached := c.lookupCollection(arv.ApiToken + "\000" + current.PortableDataHash); cached != nil {
- // PDH changed, and we already have the
- // manifest for that new PDH.
- c.logger.Debugf("cache(%s): cached pdh %s was stale, new pdh is %s and manifest is already in cache", targetID, pdh, current.PortableDataHash)
- return cached, nil
- }
- }
-
- // Either UUID->PDH is not cached, or PDH->manifest is not
- // cached.
- var retrieved arvados.Collection
- c.metrics.apiCalls.Inc()
- err := arv.Get("collections", targetID, nil, &retrieved)
- if err != nil {
- return nil, err
- }
- c.logger.Debugf("cache(%s): retrieved manifest, caching with pdh %s", targetID, retrieved.PortableDataHash)
- exp := time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.TTL))
- if targetID != retrieved.PortableDataHash {
- c.pdhs.Add(targetID, &cachedPDH{
- expire: exp,
- refresh: time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.UUIDTTL)),
- pdh: retrieved.PortableDataHash,
- })
- }
- c.collections.Add(arv.ApiToken+"\000"+retrieved.PortableDataHash, &cachedCollection{
- expire: exp,
- collection: &retrieved,
- })
- if int64(len(retrieved.ManifestText)) > c.cluster.Collections.WebDAVCache.MaxCollectionBytes/int64(c.cluster.Collections.WebDAVCache.MaxCollectionEntries) {
- select {
- case c.chPruneCollections <- struct{}{}:
- default:
- }
- }
- return &retrieved, nil
-}
-
-// pruneCollections checks the total bytes occupied by manifest_text
-// in the collection cache and removes old entries as needed to bring
-// the total size down to CollectionBytes. It also deletes all expired
-// entries.
-//
-// pruneCollections does not aim to be perfectly correct when there is
-// concurrent cache activity.
-func (c *cache) pruneCollections() {
- var size int64
- now := time.Now()
- keys := c.collections.Keys()
- entsize := make([]int, len(keys))
- expired := make([]bool, len(keys))
- for i, k := range keys {
- v, ok := c.collections.Peek(k)
- if !ok {
- continue
- }
- ent := v.(*cachedCollection)
- n := len(ent.collection.ManifestText)
- size += int64(n)
- entsize[i] = n
- expired[i] = ent.expire.Before(now)
- }
- for i, k := range keys {
- if expired[i] {
- c.collections.Remove(k)
- size -= int64(entsize[i])
- }
- }
- for i, k := range keys {
- if size <= c.cluster.Collections.WebDAVCache.MaxCollectionBytes/2 {
- break
- }
- if expired[i] {
- // already removed this entry in the previous loop
- continue
- }
- c.collections.Remove(k)
- size -= int64(entsize[i])
- }
-}
-
// collectionBytes returns the approximate combined memory size of the
// collection cache and session filesystem cache.
func (c *cache) collectionBytes() uint64 {
var size uint64
- for _, k := range c.collections.Keys() {
- v, ok := c.collections.Peek(k)
- if !ok {
- continue
- }
- size += uint64(len(v.(*cachedCollection).collection.ManifestText))
- }
for _, token := range c.sessions.Keys() {
ent, ok := c.sessions.Peek(token)
if !ok {
@@ -463,49 +237,3 @@ func (c *cache) collectionBytes() uint64 {
}
return size
}
-
-func (c *cache) lookupCollection(key string) *arvados.Collection {
- e, cached := c.collections.Get(key)
- if !cached {
- return nil
- }
- ent := e.(*cachedCollection)
- if ent.expire.Before(time.Now()) {
- c.collections.Remove(key)
- return nil
- }
- c.metrics.collectionHits.Inc()
- return ent.collection
-}
-
-func (c *cache) GetTokenUser(token string) (*arvados.User, error) {
- // Get and cache user record associated with this
- // token. We need to know their UUID for logging, and
- // whether they are an admin or not for certain
- // permission checks.
-
- // Get/create session entry
- _, sess, err := c.GetSession(token)
- if err != nil {
- return nil, err
- }
-
- // See if the user is already set, and if so, return it
- user, _ := sess.user.Load().(*arvados.User)
- if user != nil {
- return user, nil
- }
-
- // Fetch the user record
- c.metrics.apiCalls.Inc()
- var current arvados.User
-
- err = sess.client.RequestAndDecode(¤t, "GET", "/arvados/v1/users/current", nil, nil)
- if err != nil {
- return nil, err
- }
-
- // Stash the user record for next time
- sess.user.Store(¤t)
- return ¤t, nil
-}
diff --git a/services/keep-web/cache_test.go b/services/keep-web/cache_test.go
index 6b8f42717..010e29a0b 100644
--- a/services/keep-web/cache_test.go
+++ b/services/keep-web/cache_test.go
@@ -6,17 +6,21 @@ package keepweb
import (
"bytes"
+ "net/http"
+ "net/http/httptest"
+ "regexp"
+ "strings"
+ "time"
"git.arvados.org/arvados.git/sdk/go/arvados"
- "git.arvados.org/arvados.git/sdk/go/arvadosclient"
"git.arvados.org/arvados.git/sdk/go/arvadostest"
- "git.arvados.org/arvados.git/sdk/go/ctxlog"
- "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/expfmt"
"gopkg.in/check.v1"
)
-func (s *UnitSuite) checkCacheMetrics(c *check.C, reg *prometheus.Registry, regs ...string) {
+func (s *IntegrationSuite) checkCacheMetrics(c *check.C, regs ...string) {
+ s.handler.Cache.updateGauges()
+ reg := s.handler.Cache.registry
mfs, err := reg.Gather()
c.Check(err, check.IsNil)
buf := &bytes.Buffer{}
@@ -25,131 +29,139 @@ func (s *UnitSuite) checkCacheMetrics(c *check.C, reg *prometheus.Registry, regs
c.Check(enc.Encode(mf), check.IsNil)
}
mm := buf.String()
+ // Remove comments to make the "value vs. regexp" failure
+ // output easier to read.
+ mm = regexp.MustCompile(`(?m)^#.*\n`).ReplaceAllString(mm, "")
for _, reg := range regs {
- c.Check(mm, check.Matches, `(?ms).*collectioncache_`+reg+`\n.*`)
+ c.Check(mm, check.Matches, `(?ms).*keepweb_sessions_`+reg+`\n.*`)
}
}
-func (s *UnitSuite) TestCache(c *check.C) {
- arv, err := arvadosclient.MakeArvadosClient()
- c.Assert(err, check.Equals, nil)
-
- cache := &cache{
- cluster: s.cluster,
- logger: ctxlog.TestLogger(c),
- registry: prometheus.NewRegistry(),
- }
-
+func (s *IntegrationSuite) TestCache(c *check.C) {
// Hit the same collection 5 times using the same token. Only
// the first req should cause an API call; the next 4 should
// hit all caches.
- arv.ApiToken = arvadostest.AdminToken
- var coll *arvados.Collection
+ u := mustParseURL("http://" + arvadostest.FooCollection + ".keep-web.example/foo")
+ req := &http.Request{
+ Method: "GET",
+ Host: u.Host,
+ URL: u,
+ RequestURI: u.RequestURI(),
+ Header: http.Header{
+ "Authorization": {"Bearer " + arvadostest.ActiveToken},
+ },
+ }
for i := 0; i < 5; i++ {
- coll, err = cache.Get(arv, arvadostest.FooCollection, false)
- c.Check(err, check.Equals, nil)
- c.Assert(coll, check.NotNil)
- c.Check(coll.PortableDataHash, check.Equals, arvadostest.FooCollectionPDH)
- c.Check(coll.ManifestText[:2], check.Equals, ". ")
+ resp := httptest.NewRecorder()
+ s.handler.ServeHTTP(resp, req)
+ c.Check(resp.Code, check.Equals, http.StatusOK)
}
- s.checkCacheMetrics(c, cache.registry,
- "requests 5",
+ s.checkCacheMetrics(c,
"hits 4",
- "pdh_hits 4",
- "api_calls 1")
-
- // Hit the same collection 2 more times, this time requesting
- // it by PDH and using a different token. The first req should
- // miss the permission cache and fetch the new manifest; the
- // second should hit the Collection cache and skip the API
- // lookup.
- arv.ApiToken = arvadostest.ActiveToken
-
- coll2, err := cache.Get(arv, arvadostest.FooCollectionPDH, false)
- c.Check(err, check.Equals, nil)
- c.Assert(coll2, check.NotNil)
- c.Check(coll2.PortableDataHash, check.Equals, arvadostest.FooCollectionPDH)
- c.Check(coll2.ManifestText[:2], check.Equals, ". ")
- c.Check(coll2.ManifestText, check.Not(check.Equals), coll.ManifestText)
-
- s.checkCacheMetrics(c, cache.registry,
- "requests 6",
- "hits 4",
- "pdh_hits 4",
- "api_calls 2")
-
- coll2, err = cache.Get(arv, arvadostest.FooCollectionPDH, false)
- c.Check(err, check.Equals, nil)
- c.Assert(coll2, check.NotNil)
- c.Check(coll2.PortableDataHash, check.Equals, arvadostest.FooCollectionPDH)
- c.Check(coll2.ManifestText[:2], check.Equals, ". ")
-
- s.checkCacheMetrics(c, cache.registry,
- "requests 7",
- "hits 5",
- "pdh_hits 4",
- "api_calls 2")
-
- // Alternating between two collections N times should produce
- // only 2 more API calls.
- arv.ApiToken = arvadostest.AdminToken
- for i := 0; i < 20; i++ {
- var target string
- if i%2 == 0 {
- target = arvadostest.HelloWorldCollection
- } else {
- target = arvadostest.FooBarDirCollection
- }
- _, err := cache.Get(arv, target, false)
- c.Check(err, check.Equals, nil)
+ "misses 1",
+ "active 1")
+
+ // Hit a shared collection 3 times using PDH, using a
+ // different token.
+ u2 := mustParseURL("http://" + strings.Replace(arvadostest.BarFileCollectionPDH, "+", "-", 1) + ".keep-web.example/bar")
+ req2 := &http.Request{
+ Method: "GET",
+ Host: u2.Host,
+ URL: u2,
+ RequestURI: u2.RequestURI(),
+ Header: http.Header{
+ "Authorization": {"Bearer " + arvadostest.SpectatorToken},
+ },
}
- s.checkCacheMetrics(c, cache.registry,
- "requests 27",
- "hits 23",
- "pdh_hits 22",
- "api_calls 4")
-}
-
-func (s *UnitSuite) TestCacheForceReloadByPDH(c *check.C) {
- arv, err := arvadosclient.MakeArvadosClient()
- c.Assert(err, check.Equals, nil)
-
- cache := &cache{
- cluster: s.cluster,
- logger: ctxlog.TestLogger(c),
- registry: prometheus.NewRegistry(),
+ for i := 0; i < 3; i++ {
+ resp2 := httptest.NewRecorder()
+ s.handler.ServeHTTP(resp2, req2)
+ c.Check(resp2.Code, check.Equals, http.StatusOK)
}
-
- for _, forceReload := range []bool{false, true, false, true} {
- _, err := cache.Get(arv, arvadostest.FooCollectionPDH, forceReload)
- c.Check(err, check.Equals, nil)
+ s.checkCacheMetrics(c,
+ "hits 6",
+ "misses 2",
+ "active 2")
+
+ // Alternating between two collections/tokens N times should
+ // use the existing sessions.
+ for i := 0; i < 7; i++ {
+ resp := httptest.NewRecorder()
+ s.handler.ServeHTTP(resp, req)
+ c.Check(resp.Code, check.Equals, http.StatusOK)
+
+ resp2 := httptest.NewRecorder()
+ s.handler.ServeHTTP(resp2, req2)
+ c.Check(resp2.Code, check.Equals, http.StatusOK)
}
-
- s.checkCacheMetrics(c, cache.registry,
- "requests 4",
- "hits 3",
- "pdh_hits 0",
- "api_calls 1")
+ s.checkCacheMetrics(c,
+ "hits 20",
+ "misses 2",
+ "active 2")
}
-func (s *UnitSuite) TestCacheForceReloadByUUID(c *check.C) {
- arv, err := arvadosclient.MakeArvadosClient()
- c.Assert(err, check.Equals, nil)
-
- cache := &cache{
- cluster: s.cluster,
- logger: ctxlog.TestLogger(c),
- registry: prometheus.NewRegistry(),
- }
-
- for _, forceReload := range []bool{false, true, false, true} {
- _, err := cache.Get(arv, arvadostest.FooCollection, forceReload)
- c.Check(err, check.Equals, nil)
- }
+func (s *IntegrationSuite) TestForceReloadPDH(c *check.C) {
+ filename := strings.Replace(time.Now().Format(time.RFC3339Nano), ":", ".", -1)
+ manifest := ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:" + filename + "\n"
+ pdh := arvados.PortableDataHash(manifest)
+ client := arvados.NewClientFromEnv()
+ client.AuthToken = arvadostest.ActiveToken
+
+ _, resp := s.do("GET", "http://"+strings.Replace(pdh, "+", "-", 1)+".keep-web.example/"+filename, arvadostest.ActiveToken, nil)
+ c.Check(resp.Code, check.Equals, http.StatusNotFound)
+
+ var coll arvados.Collection
+ err := client.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
+ "collection": map[string]string{
+ "manifest_text": manifest,
+ },
+ })
+ c.Assert(err, check.IsNil)
+ defer client.RequestAndDecode(nil, "DELETE", "arvados/v1/collections/"+coll.UUID, nil, nil)
+ c.Assert(coll.PortableDataHash, check.Equals, pdh)
+
+ _, resp = s.do("GET", "http://"+strings.Replace(pdh, "+", "-", 1)+".keep-web.example/"+filename, "", http.Header{
+ "Authorization": {"Bearer " + arvadostest.ActiveToken},
+ "Cache-Control": {"must-revalidate"},
+ })
+ c.Check(resp.Code, check.Equals, http.StatusOK)
+
+ _, resp = s.do("GET", "http://"+strings.Replace(pdh, "+", "-", 1)+".keep-web.example/missingfile", "", http.Header{
+ "Authorization": {"Bearer " + arvadostest.ActiveToken},
+ "Cache-Control": {"must-revalidate"},
+ })
+ c.Check(resp.Code, check.Equals, http.StatusNotFound)
+}
- s.checkCacheMetrics(c, cache.registry,
- "requests 4",
- "hits 3",
- "pdh_hits 3",
- "api_calls 3")
+func (s *IntegrationSuite) TestForceReloadUUID(c *check.C) {
+ client := arvados.NewClientFromEnv()
+ client.AuthToken = arvadostest.ActiveToken
+ var coll arvados.Collection
+ err := client.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
+ "collection": map[string]string{
+ "manifest_text": ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:oldfile\n",
+ },
+ })
+ c.Assert(err, check.IsNil)
+ defer client.RequestAndDecode(nil, "DELETE", "arvados/v1/collections/"+coll.UUID, nil, nil)
+
+ _, resp := s.do("GET", "http://"+coll.UUID+".keep-web.example/newfile", arvadostest.ActiveToken, nil)
+ c.Check(resp.Code, check.Equals, http.StatusNotFound)
+ _, resp = s.do("GET", "http://"+coll.UUID+".keep-web.example/oldfile", arvadostest.ActiveToken, nil)
+ c.Check(resp.Code, check.Equals, http.StatusOK)
+ _, resp = s.do("GET", "http://"+coll.UUID+".keep-web.example/newfile", arvadostest.ActiveToken, nil)
+ c.Check(resp.Code, check.Equals, http.StatusNotFound)
+ err = client.RequestAndDecode(&coll, "PATCH", "arvados/v1/collections/"+coll.UUID, nil, map[string]interface{}{
+ "collection": map[string]string{
+ "manifest_text": ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:oldfile 0:0:newfile\n",
+ },
+ })
+ c.Assert(err, check.IsNil)
+ _, resp = s.do("GET", "http://"+coll.UUID+".keep-web.example/newfile", arvadostest.ActiveToken, nil)
+ c.Check(resp.Code, check.Equals, http.StatusNotFound)
+ _, resp = s.do("GET", "http://"+coll.UUID+".keep-web.example/newfile", "", http.Header{
+ "Authorization": {"Bearer " + arvadostest.ActiveToken},
+ "Cache-Control": {"must-revalidate"},
+ })
+ c.Check(resp.Code, check.Equals, http.StatusOK)
}
diff --git a/services/keep-web/handler.go b/services/keep-web/handler.go
index 3a1d9acde..7ac8bc02d 100644
--- a/services/keep-web/handler.go
+++ b/services/keep-web/handler.go
@@ -6,6 +6,7 @@ package keepweb
import (
"encoding/json"
+ "errors"
"fmt"
"html"
"html/template"
@@ -39,8 +40,8 @@ type handler struct {
var urlPDHDecoder = strings.NewReplacer(" ", "+", "-", "+")
-var notFoundMessage = "404 Not found\r\n\r\nThe requested path was not found, or you do not have permission to access it.\r"
-var unauthorizedMessage = "401 Unauthorized\r\n\r\nA valid Arvados token must be provided to access this resource.\r"
+var notFoundMessage = "Not Found"
+var unauthorizedMessage = "401 Unauthorized\r\n\r\nA valid Arvados token must be provided to access this resource.\r\n"
// parseCollectionIDFromURL returns a UUID or PDH if s is a UUID or a
// PDH (even if it is a PDH with "+" replaced by " " or "-");
@@ -73,9 +74,10 @@ func (h *handler) serveStatus(w http.ResponseWriter, r *http.Request) {
// updateOnSuccess wraps httpserver.ResponseWriter. If the handler
// sends an HTTP header indicating success, updateOnSuccess first
-// calls the provided update func. If the update func fails, a 500
-// response is sent, and the status code and body sent by the handler
-// are ignored (all response writes return the update error).
+// calls the provided update func. If the update func fails, an error
+// response is sent (using the error's HTTP status or 500 if none),
+// and the status code and body sent by the handler are ignored (all
+// response writes return the update error).
type updateOnSuccess struct {
httpserver.ResponseWriter
logger logrus.FieldLogger
@@ -100,10 +102,11 @@ func (uos *updateOnSuccess) WriteHeader(code int) {
if code >= 200 && code < 400 {
if uos.err = uos.update(); uos.err != nil {
code := http.StatusInternalServerError
- if err, ok := uos.err.(*arvados.TransactionError); ok {
- code = err.StatusCode
+ var he interface{ HTTPStatus() int }
+ if errors.As(uos.err, &he) {
+ code = he.HTTPStatus()
}
- uos.logger.WithError(uos.err).Errorf("update() returned error type %T, changing response to HTTP %d", uos.err, code)
+ uos.logger.WithError(uos.err).Errorf("update() returned %T error, changing response to HTTP %d", uos.err, code)
http.Error(uos.ResponseWriter, uos.err.Error(), code)
return
}
@@ -369,30 +372,60 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
}
defer h.clientPool.Put(arv)
- var collection *arvados.Collection
+ dirOpenMode := os.O_RDONLY
+ if writeMethod[r.Method] {
+ dirOpenMode = os.O_RDWR
+ }
+
+ validToken := make(map[string]bool)
+ var token string
var tokenUser *arvados.User
- tokenResult := make(map[string]int)
- for _, arv.ApiToken = range tokens {
- var err error
- collection, err = h.Cache.Get(arv, collectionID, forceReload)
- if err == nil {
- // Success
- break
+ var sessionFS arvados.CustomFileSystem
+ var session *cachedSession
+ var collectionDir arvados.File
+ for _, token = range tokens {
+ var statusErr interface{ HTTPStatus() int }
+ fs, sess, user, err := h.Cache.GetSession(token)
+ if errors.As(err, &statusErr) && statusErr.HTTPStatus() == http.StatusUnauthorized {
+ // bad token
+ continue
+ } else if err != nil {
+ http.Error(w, "cache error: "+err.Error(), http.StatusInternalServerError)
+ return
+ }
+ f, err := fs.OpenFile("by_id/"+collectionID, dirOpenMode, 0)
+ if errors.As(err, &statusErr) && statusErr.HTTPStatus() == http.StatusForbidden {
+ // collection id is outside token scope
+ validToken[token] = true
+ continue
}
- if srvErr, ok := err.(arvadosclient.APIServerError); ok {
- switch srvErr.HttpStatusCode {
- case 404, 401:
- // Token broken or insufficient to
- // retrieve collection
- tokenResult[arv.ApiToken] = srvErr.HttpStatusCode
- continue
+ validToken[token] = true
+ if os.IsNotExist(err) {
+ // collection does not exist or is not
+ // readable using this token
+ continue
+ } else if err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ defer f.Close()
+
+ collectionDir, sessionFS, session, tokenUser = f, fs, sess, user
+ break
+ }
+ if forceReload {
+ err := collectionDir.Sync()
+ if err != nil {
+ var statusErr interface{ HTTPStatus() int }
+ if errors.As(err, &statusErr) {
+ http.Error(w, err.Error(), statusErr.HTTPStatus())
+ } else {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
}
+ return
}
- // Something more serious is wrong
- http.Error(w, "cache error: "+err.Error(), http.StatusInternalServerError)
- return
}
- if collection == nil {
+ if session == nil {
if pathToken || !credentialsOK {
// Either the URL is a "secret sharing link"
// that didn't work out (and asking the client
@@ -403,9 +436,9 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
return
}
for _, t := range reqTokens {
- if tokenResult[t] == 404 {
- // The client provided valid token(s), but the
- // collection was not found.
+ if validToken[t] {
+ // The client provided valid token(s),
+ // but the collection was not found.
http.Error(w, notFoundMessage, http.StatusNotFound)
return
}
@@ -452,118 +485,104 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
return
}
- kc, err := keepclient.MakeKeepClient(arv)
- if err != nil {
- http.Error(w, "error setting up keep client: "+err.Error(), http.StatusInternalServerError)
- return
- }
- kc.RequestID = r.Header.Get("X-Request-Id")
-
var basename string
if len(targetPath) > 0 {
basename = targetPath[len(targetPath)-1]
}
applyContentDispositionHdr(w, r, basename, attachment)
- client := (&arvados.Client{
- APIHost: arv.ApiServer,
- AuthToken: arv.ApiToken,
- Insecure: arv.ApiInsecure,
- }).WithRequestID(r.Header.Get("X-Request-Id"))
-
- fs, err := collection.FileSystem(client, kc)
- if err != nil {
- http.Error(w, "error creating collection filesystem: "+err.Error(), http.StatusInternalServerError)
+ if arvadosclient.PDHMatch(collectionID) && writeMethod[r.Method] {
+ http.Error(w, errReadOnly.Error(), http.StatusMethodNotAllowed)
return
}
-
- writefs, writeOK := fs.(arvados.CollectionFileSystem)
- targetIsPDH := arvadosclient.PDHMatch(collectionID)
- if (targetIsPDH || !writeOK) && writeMethod[r.Method] {
- http.Error(w, errReadOnly.Error(), http.StatusMethodNotAllowed)
+ if !h.userPermittedToUploadOrDownload(r.Method, tokenUser) {
+ http.Error(w, "Not permitted", http.StatusForbidden)
return
}
+ h.logUploadOrDownload(r, session.arvadosclient, sessionFS, "by_id/"+collectionID+"/"+strings.Join(targetPath, "/"), nil, tokenUser)
- // Check configured permission
- _, sess, err := h.Cache.GetSession(arv.ApiToken)
- tokenUser, err = h.Cache.GetTokenUser(arv.ApiToken)
-
- if webdavMethod[r.Method] {
- if !h.userPermittedToUploadOrDownload(r.Method, tokenUser) {
- http.Error(w, "Not permitted", http.StatusForbidden)
+ if writeMethod[r.Method] {
+ // Save the collection only if/when all
+ // webdav->filesystem operations succeed --
+ // and send a 500 error if the modified
+ // collection can't be saved.
+ //
+ // Perform the write in a separate sitefs, so
+ // concurrent read operations on the same
+ // collection see the previous saved
+ // state. After the write succeeds and the
+ // collection record is updated, we reset the
+ // session so the updates are visible in
+ // subsequent read requests.
+ client := session.client.WithRequestID(r.Header.Get("X-Request-Id"))
+ sessionFS = client.SiteFileSystem(session.keepclient)
+ writingDir, err := sessionFS.OpenFile("by_id/"+collectionID, os.O_RDONLY, 0)
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
- h.logUploadOrDownload(r, sess.arvadosclient, nil, strings.Join(targetPath, "/"), collection, tokenUser)
-
- if writeMethod[r.Method] {
- // Save the collection only if/when all
- // webdav->filesystem operations succeed --
- // and send a 500 error if the modified
- // collection can't be saved.
- w = &updateOnSuccess{
- ResponseWriter: w,
- logger: ctxlog.FromContext(r.Context()),
- update: func() error {
- return h.Cache.Update(client, *collection, writefs)
- }}
- }
- h := webdav.Handler{
- Prefix: "/" + strings.Join(pathParts[:stripParts], "/"),
- FileSystem: &webdavFS{
- collfs: fs,
- writing: writeMethod[r.Method],
- alwaysReadEOF: r.Method == "PROPFIND",
- },
- LockSystem: h.webdavLS,
- Logger: func(_ *http.Request, err error) {
+ defer writingDir.Close()
+ w = &updateOnSuccess{
+ ResponseWriter: w,
+ logger: ctxlog.FromContext(r.Context()),
+ update: func() error {
+ err := writingDir.Sync()
+ var te arvados.TransactionError
+ if errors.As(err, &te) {
+ err = te
+ }
+ if err != nil {
+ return err
+ }
+ // Sync the changes to the persistent
+ // sessionfs for this token.
+ snap, err := writingDir.Snapshot()
if err != nil {
- ctxlog.FromContext(r.Context()).WithError(err).Error("error reported by webdav handler")
+ return err
}
- },
- }
- h.ServeHTTP(w, r)
- return
+ collectionDir.Splice(snap)
+ return nil
+ }}
}
-
- openPath := "/" + strings.Join(targetPath, "/")
- f, err := fs.Open(openPath)
- if os.IsNotExist(err) {
- // Requested non-existent path
- http.Error(w, notFoundMessage, http.StatusNotFound)
- return
- } else if err != nil {
- // Some other (unexpected) error
- http.Error(w, "open: "+err.Error(), http.StatusInternalServerError)
- return
+ wh := webdav.Handler{
+ Prefix: "/" + strings.Join(pathParts[:stripParts], "/"),
+ FileSystem: &webdavFS{
+ collfs: sessionFS,
+ prefix: "by_id/" + collectionID + "/",
+ writing: writeMethod[r.Method],
+ alwaysReadEOF: r.Method == "PROPFIND",
+ },
+ LockSystem: h.webdavLS,
+ Logger: func(r *http.Request, err error) {
+ if err != nil {
+ ctxlog.FromContext(r.Context()).WithError(err).Error("error reported by webdav handler")
+ }
+ },
}
- defer f.Close()
- if stat, err := f.Stat(); err != nil {
- // Can't get Size/IsDir (shouldn't happen with a collectionFS!)
- http.Error(w, "stat: "+err.Error(), http.StatusInternalServerError)
- } else if stat.IsDir() && !strings.HasSuffix(r.URL.Path, "/") {
- // If client requests ".../dirname", redirect to
- // ".../dirname/". This way, relative links in the
- // listing for "dirname" can always be "fnm", never
- // "dirname/fnm".
- h.seeOtherWithCookie(w, r, r.URL.Path+"/", credentialsOK)
- } else if stat.IsDir() {
- h.serveDirectory(w, r, collection.Name, fs, openPath, true)
- } else {
- if !h.userPermittedToUploadOrDownload(r.Method, tokenUser) {
- http.Error(w, "Not permitted", http.StatusForbidden)
+ if r.Method == http.MethodGet || r.Method == http.MethodHead {
+ targetfnm := "by_id/" + collectionID + "/" + strings.Join(pathParts[stripParts:], "/")
+ if fi, err := sessionFS.Stat(targetfnm); err == nil && fi.IsDir() {
+ if !strings.HasSuffix(r.URL.Path, "/") {
+ h.seeOtherWithCookie(w, r, r.URL.Path+"/", credentialsOK)
+ } else {
+ h.serveDirectory(w, r, fi.Name(), sessionFS, targetfnm, true)
+ }
return
}
- h.logUploadOrDownload(r, sess.arvadosclient, nil, strings.Join(targetPath, "/"), collection, tokenUser)
-
- http.ServeContent(w, r, basename, stat.ModTime(), f)
- if wrote := int64(w.WroteBodyBytes()); wrote != stat.Size() && w.WroteStatus() == http.StatusOK {
- // If we wrote fewer bytes than expected, it's
- // too late to change the real response code
- // or send an error message to the client, but
- // at least we can try to put some useful
- // debugging info in the logs.
- n, err := f.Read(make([]byte, 1024))
- ctxlog.FromContext(r.Context()).Errorf("stat.Size()==%d but only wrote %d bytes; read(1024) returns %d, %v", stat.Size(), wrote, n, err)
+ }
+ wh.ServeHTTP(w, r)
+ if r.Method == http.MethodGet && w.WroteStatus() == http.StatusOK {
+ wrote := int64(w.WroteBodyBytes())
+ fnm := strings.Join(pathParts[stripParts:], "/")
+ fi, err := wh.FileSystem.Stat(r.Context(), fnm)
+ if err == nil && fi.Size() != wrote {
+ var n int
+ f, err := wh.FileSystem.OpenFile(r.Context(), fnm, os.O_RDONLY, 0)
+ if err == nil {
+ n, err = f.Read(make([]byte, 1024))
+ f.Close()
+ }
+ ctxlog.FromContext(r.Context()).Errorf("stat.Size()==%d but only wrote %d bytes; read(1024) returns %d, %v", fi.Size(), wrote, n, err)
}
}
}
@@ -601,12 +620,11 @@ func (h *handler) serveSiteFS(w http.ResponseWriter, r *http.Request, tokens []s
return
}
- fs, sess, err := h.Cache.GetSession(tokens[0])
+ fs, sess, user, err := h.Cache.GetSession(tokens[0])
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
- fs.ForwardSlashNameSubstitution(h.Cluster.Collections.ForwardSlashNameSubstitution)
f, err := fs.Open(r.URL.Path)
if os.IsNotExist(err) {
http.Error(w, err.Error(), http.StatusNotFound)
@@ -625,19 +643,17 @@ func (h *handler) serveSiteFS(w http.ResponseWriter, r *http.Request, tokens []s
return
}
- tokenUser, err := h.Cache.GetTokenUser(tokens[0])
- if !h.userPermittedToUploadOrDownload(r.Method, tokenUser) {
+ if !h.userPermittedToUploadOrDownload(r.Method, user) {
http.Error(w, "Not permitted", http.StatusForbidden)
return
}
- h.logUploadOrDownload(r, sess.arvadosclient, fs, r.URL.Path, nil, tokenUser)
+ h.logUploadOrDownload(r, sess.arvadosclient, fs, r.URL.Path, nil, user)
if r.Method == "GET" {
_, basename := filepath.Split(r.URL.Path)
applyContentDispositionHdr(w, r, basename, attachment)
}
wh := webdav.Handler{
- Prefix: "/",
FileSystem: &webdavFS{
collfs: fs,
writing: writeMethod[r.Method],
@@ -960,9 +976,14 @@ func (h *handler) logUploadOrDownload(
func (h *handler) determineCollection(fs arvados.CustomFileSystem, path string) (*arvados.Collection, string) {
target := strings.TrimSuffix(path, "/")
- for {
+ for cut := len(target); cut >= 0; cut = strings.LastIndexByte(target, '/') {
+ target = target[:cut]
fi, err := fs.Stat(target)
- if err != nil {
+ if os.IsNotExist(err) {
+ // creating a new file/dir, or download
+ // destined to fail
+ continue
+ } else if err != nil {
return nil, ""
}
switch src := fi.Sys().(type) {
@@ -975,11 +996,6 @@ func (h *handler) determineCollection(fs arvados.CustomFileSystem, path string)
return nil, ""
}
}
- // Try parent
- cut := strings.LastIndexByte(target, '/')
- if cut < 0 {
- return nil, ""
- }
- target = target[:cut]
}
+ return nil, ""
}
diff --git a/services/keep-web/handler_test.go b/services/keep-web/handler_test.go
index 768013185..0f7d50787 100644
--- a/services/keep-web/handler_test.go
+++ b/services/keep-web/handler_test.go
@@ -366,6 +366,24 @@ func (s *IntegrationSuite) TestVhostPortMatch(c *check.C) {
}
}
+func (s *IntegrationSuite) do(method string, urlstring string, token string, hdr http.Header) (*http.Request, *httptest.ResponseRecorder) {
+ u := mustParseURL(urlstring)
+ if hdr == nil && token != "" {
+ hdr = http.Header{"Authorization": {"Bearer " + token}}
+ } else if hdr == nil {
+ hdr = http.Header{}
+ } else if token != "" {
+ panic("must not pass both token and hdr")
+ }
+ return s.doReq(&http.Request{
+ Method: method,
+ Host: u.Host,
+ URL: u,
+ RequestURI: u.RequestURI(),
+ Header: hdr,
+ })
+}
+
func (s *IntegrationSuite) doReq(req *http.Request) (*http.Request, *httptest.ResponseRecorder) {
resp := httptest.NewRecorder()
s.handler.ServeHTTP(resp, req)
@@ -409,6 +427,26 @@ func (s *IntegrationSuite) TestSingleOriginSecretLink(c *check.C) {
)
}
+func (s *IntegrationSuite) TestCollectionSharingToken(c *check.C) {
+ s.testVhostRedirectTokenToCookie(c, "GET",
+ "example.com/c="+arvadostest.FooFileCollectionUUID+"/t="+arvadostest.FooFileCollectionSharingToken+"/foo",
+ "",
+ nil,
+ "",
+ http.StatusOK,
+ "foo",
+ )
+ // Same valid sharing token, but requesting a different collection
+ s.testVhostRedirectTokenToCookie(c, "GET",
+ "example.com/c="+arvadostest.FooCollection+"/t="+arvadostest.FooFileCollectionSharingToken+"/foo",
+ "",
+ nil,
+ "",
+ http.StatusNotFound,
+ notFoundMessage+"\n",
+ )
+}
+
// Bad token in URL is 404 Not Found because it doesn't make sense to
// retry the same URL with different authorization.
func (s *IntegrationSuite) TestSingleOriginSecretLinkBadToken(c *check.C) {
@@ -1245,7 +1283,7 @@ func copyHeader(h http.Header) http.Header {
}
func (s *IntegrationSuite) checkUploadDownloadRequest(c *check.C, req *http.Request,
- successCode int, direction string, perm bool, userUuid string, collectionUuid string, filepath string) {
+ successCode int, direction string, perm bool, userUuid, collectionUuid, collectionPDH, filepath string) {
client := arvados.NewClientFromEnv()
client.AuthToken = arvadostest.AdminToken
@@ -1258,6 +1296,7 @@ func (s *IntegrationSuite) checkUploadDownloadRequest(c *check.C, req *http.Requ
c.Check(err, check.IsNil)
c.Check(logentries.Items, check.HasLen, 1)
lastLogId := logentries.Items[0].ID
+ c.Logf("lastLogId: %d", lastLogId)
var logbuf bytes.Buffer
logger := logrus.New()
@@ -1274,6 +1313,7 @@ func (s *IntegrationSuite) checkUploadDownloadRequest(c *check.C, req *http.Requ
deadline := time.Now().Add(time.Second)
for {
c.Assert(time.Now().After(deadline), check.Equals, false, check.Commentf("timed out waiting for log entry"))
+ logentries = arvados.LogList{}
err = client.RequestAndDecode(&logentries, "GET", "arvados/v1/logs", nil,
arvados.ResourceListParams{
Filters: []arvados.Filter{
@@ -1288,6 +1328,7 @@ func (s *IntegrationSuite) checkUploadDownloadRequest(c *check.C, req *http.Requ
logentries.Items[0].ID > lastLogId &&
logentries.Items[0].ObjectUUID == userUuid &&
logentries.Items[0].Properties["collection_uuid"] == collectionUuid &&
+ (collectionPDH == "" || logentries.Items[0].Properties["portable_data_hash"] == collectionPDH) &&
logentries.Items[0].Properties["collection_file_path"] == filepath {
break
}
@@ -1321,7 +1362,7 @@ func (s *IntegrationSuite) TestDownloadLoggingPermission(c *check.C) {
},
}
s.checkUploadDownloadRequest(c, req, http.StatusOK, "download", adminperm,
- arvadostest.AdminUserUUID, arvadostest.FooCollection, "foo")
+ arvadostest.AdminUserUUID, arvadostest.FooCollection, arvadostest.FooCollectionPDH, "foo")
// Test user permission
req = &http.Request{
@@ -1334,7 +1375,7 @@ func (s *IntegrationSuite) TestDownloadLoggingPermission(c *check.C) {
},
}
s.checkUploadDownloadRequest(c, req, http.StatusOK, "download", userperm,
- arvadostest.ActiveUserUUID, arvadostest.FooCollection, "foo")
+ arvadostest.ActiveUserUUID, arvadostest.FooCollection, arvadostest.FooCollectionPDH, "foo")
}
}
@@ -1354,7 +1395,7 @@ func (s *IntegrationSuite) TestDownloadLoggingPermission(c *check.C) {
},
}
s.checkUploadDownloadRequest(c, req, http.StatusOK, "download", true,
- arvadostest.ActiveUserUUID, arvadostest.MultilevelCollection1, "dir1/subdir/file1")
+ arvadostest.ActiveUserUUID, arvadostest.MultilevelCollection1, arvadostest.MultilevelCollection1PDH, "dir1/subdir/file1")
}
u = mustParseURL("http://" + strings.Replace(arvadostest.FooCollectionPDH, "+", "-", 1) + ".keep-web.example/foo")
@@ -1368,7 +1409,7 @@ func (s *IntegrationSuite) TestDownloadLoggingPermission(c *check.C) {
},
}
s.checkUploadDownloadRequest(c, req, http.StatusOK, "download", true,
- arvadostest.ActiveUserUUID, arvadostest.FooCollection, "foo")
+ arvadostest.ActiveUserUUID, "", arvadostest.FooCollectionPDH, "foo")
}
func (s *IntegrationSuite) TestUploadLoggingPermission(c *check.C) {
@@ -1408,7 +1449,7 @@ func (s *IntegrationSuite) TestUploadLoggingPermission(c *check.C) {
Body: io.NopCloser(bytes.NewReader([]byte("bar"))),
}
s.checkUploadDownloadRequest(c, req, http.StatusCreated, "upload", adminperm,
- arvadostest.AdminUserUUID, coll.UUID, "bar")
+ arvadostest.AdminUserUUID, coll.UUID, "", "bar")
// Test user permission
req = &http.Request{
@@ -1422,7 +1463,7 @@ func (s *IntegrationSuite) TestUploadLoggingPermission(c *check.C) {
Body: io.NopCloser(bytes.NewReader([]byte("bar"))),
}
s.checkUploadDownloadRequest(c, req, http.StatusCreated, "upload", userperm,
- arvadostest.ActiveUserUUID, coll.UUID, "bar")
+ arvadostest.ActiveUserUUID, coll.UUID, "", "bar")
}
}
}
diff --git a/services/keep-web/s3.go b/services/keep-web/s3.go
index 1f458f8e5..381a1110b 100644
--- a/services/keep-web/s3.go
+++ b/services/keep-web/s3.go
@@ -27,9 +27,7 @@ import (
"time"
"git.arvados.org/arvados.git/sdk/go/arvados"
- "git.arvados.org/arvados.git/sdk/go/arvadosclient"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
- "git.arvados.org/arvados.git/sdk/go/keepclient"
"github.com/AdRoll/goamz/s3"
)
@@ -312,33 +310,17 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
return false
}
- var err error
- var fs arvados.CustomFileSystem
- var arvclient *arvadosclient.ArvadosClient
+ fs, sess, tokenUser, err := h.Cache.GetSession(token)
+ if err != nil {
+ s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError)
+ return true
+ }
if r.Method == http.MethodGet || r.Method == http.MethodHead {
- // Use a single session (cached FileSystem) across
- // multiple read requests.
- var sess *cachedSession
- fs, sess, err = h.Cache.GetSession(token)
- if err != nil {
- s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError)
- return true
- }
- arvclient = sess.arvadosclient
- } else {
// Create a FileSystem for this request, to avoid
// exposing incomplete write operations to concurrent
// requests.
- var kc *keepclient.KeepClient
- var release func()
- var client *arvados.Client
- arvclient, kc, client, release, err = h.getClients(r.Header.Get("X-Request-Id"), token)
- if err != nil {
- s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError)
- return true
- }
- defer release()
- fs = client.SiteFileSystem(kc)
+ client := sess.client.WithRequestID(r.Header.Get("X-Request-Id"))
+ fs = client.SiteFileSystem(sess.keepclient)
fs.ForwardSlashNameSubstitution(h.Cluster.Collections.ForwardSlashNameSubstitution)
}
@@ -418,12 +400,11 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
return true
}
- tokenUser, err := h.Cache.GetTokenUser(token)
if !h.userPermittedToUploadOrDownload(r.Method, tokenUser) {
http.Error(w, "Not permitted", http.StatusForbidden)
return true
}
- h.logUploadOrDownload(r, arvclient, fs, fspath, nil, tokenUser)
+ h.logUploadOrDownload(r, sess.arvadosclient, fs, fspath, nil, tokenUser)
// shallow copy r, and change URL path
r := *r
@@ -514,12 +495,11 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
}
defer f.Close()
- tokenUser, err := h.Cache.GetTokenUser(token)
if !h.userPermittedToUploadOrDownload(r.Method, tokenUser) {
http.Error(w, "Not permitted", http.StatusForbidden)
return true
}
- h.logUploadOrDownload(r, arvclient, fs, fspath, nil, tokenUser)
+ h.logUploadOrDownload(r, sess.arvadosclient, fs, fspath, nil, tokenUser)
_, err = io.Copy(f, r.Body)
if err != nil {
diff --git a/services/keep-web/server_test.go b/services/keep-web/server_test.go
index 61c540808..1305416c4 100644
--- a/services/keep-web/server_test.go
+++ b/services/keep-web/server_test.go
@@ -29,6 +29,7 @@ import (
"git.arvados.org/arvados.git/sdk/go/ctxlog"
"git.arvados.org/arvados.git/sdk/go/httpserver"
"git.arvados.org/arvados.git/sdk/go/keepclient"
+ "github.com/prometheus/client_golang/prometheus"
check "gopkg.in/check.v1"
)
@@ -49,17 +50,17 @@ func (s *IntegrationSuite) TestNoToken(c *check.C) {
} {
hdr, body, _ := s.runCurl(c, token, "collections.example.com", "/collections/"+arvadostest.FooCollection+"/foo")
c.Check(hdr, check.Matches, `(?s)HTTP/1.1 404 Not Found\r\n.*`)
- c.Check(body, check.Equals, notFoundMessage+"\n")
+ c.Check(strings.TrimSpace(body), check.Equals, notFoundMessage)
if token != "" {
hdr, body, _ = s.runCurl(c, token, "collections.example.com", "/collections/download/"+arvadostest.FooCollection+"/"+token+"/foo")
c.Check(hdr, check.Matches, `(?s)HTTP/1.1 404 Not Found\r\n.*`)
- c.Check(body, check.Equals, notFoundMessage+"\n")
+ c.Check(strings.TrimSpace(body), check.Equals, notFoundMessage)
}
hdr, body, _ = s.runCurl(c, token, "collections.example.com", "/bad-route")
c.Check(hdr, check.Matches, `(?s)HTTP/1.1 404 Not Found\r\n.*`)
- c.Check(body, check.Equals, notFoundMessage+"\n")
+ c.Check(strings.TrimSpace(body), check.Equals, notFoundMessage)
}
}
@@ -92,7 +93,7 @@ func (s *IntegrationSuite) Test404(c *check.C) {
hdr, body, _ := s.runCurl(c, arvadostest.ActiveToken, "collections.example.com", uri)
c.Check(hdr, check.Matches, "(?s)HTTP/1.1 404 Not Found\r\n.*")
if len(body) > 0 {
- c.Check(body, check.Equals, notFoundMessage+"\n")
+ c.Check(strings.TrimSpace(body), check.Equals, notFoundMessage)
}
}
}
@@ -475,15 +476,7 @@ func (s *IntegrationSuite) TestMetrics(c *check.C) {
c.Check(summaries["request_duration_seconds/get/200"].SampleCount, check.Equals, "3")
c.Check(summaries["request_duration_seconds/get/404"].SampleCount, check.Equals, "1")
c.Check(summaries["time_to_status_seconds/get/404"].SampleCount, check.Equals, "1")
- c.Check(counters["arvados_keepweb_collectioncache_requests//"].Value, check.Equals, int64(2))
- c.Check(counters["arvados_keepweb_collectioncache_api_calls//"].Value, check.Equals, int64(2))
- c.Check(counters["arvados_keepweb_collectioncache_hits//"].Value, check.Equals, int64(1))
- c.Check(counters["arvados_keepweb_collectioncache_pdh_hits//"].Value, check.Equals, int64(1))
- c.Check(gauges["arvados_keepweb_collectioncache_cached_manifests//"].Value, check.Equals, float64(1))
- // FooCollection's cached manifest size is 45 ("1f4b0....+45")
- // plus one 51-byte blob signature; session fs counts 3 inodes
- // * 64 bytes.
- c.Check(gauges["arvados_keepweb_sessions_cached_collection_bytes//"].Value, check.Equals, float64(45+51+64*3))
+ c.Check(gauges["arvados_keepweb_sessions_cached_session_bytes//"].Value, check.Equals, float64(445))
// If the Host header indicates a collection, /metrics.json
// refers to a file in the collection -- the metrics handler
@@ -529,7 +522,7 @@ func (s *IntegrationSuite) SetUpTest(c *check.C) {
ctx := ctxlog.Context(context.Background(), logger)
- s.handler = newHandlerOrErrorHandler(ctx, cluster, cluster.SystemRootToken, nil).(*handler)
+ s.handler = newHandlerOrErrorHandler(ctx, cluster, cluster.SystemRootToken, prometheus.NewRegistry()).(*handler)
s.testServer = httptest.NewUnstartedServer(
httpserver.AddRequestIDs(
httpserver.LogRequests(
diff --git a/services/keep-web/webdav.go b/services/keep-web/webdav.go
index 501c355a7..0039f04ee 100644
--- a/services/keep-web/webdav.go
+++ b/services/keep-web/webdav.go
@@ -36,7 +36,10 @@ var (
// existence automatically so sequences like "mkcol foo; put foo/bar"
// work as expected.
type webdavFS struct {
- collfs arvados.FileSystem
+ collfs arvados.FileSystem
+ // prefix works like fs.Sub: Stat(name) calls
+ // Stat(prefix+name) in the wrapped filesystem.
+ prefix string
writing bool
// webdav PROPFIND reads the first few bytes of each file
// whose filename extension isn't recognized, which is
@@ -56,7 +59,7 @@ func (fs *webdavFS) makeparents(name string) {
}
dir = dir[:len(dir)-1]
fs.makeparents(dir)
- fs.collfs.Mkdir(dir, 0755)
+ fs.collfs.Mkdir(fs.prefix+dir, 0755)
}
func (fs *webdavFS) Mkdir(ctx context.Context, name string, perm os.FileMode) error {
@@ -65,7 +68,7 @@ func (fs *webdavFS) Mkdir(ctx context.Context, name string, perm os.FileMode) er
}
name = strings.TrimRight(name, "/")
fs.makeparents(name)
- return fs.collfs.Mkdir(name, 0755)
+ return fs.collfs.Mkdir(fs.prefix+name, 0755)
}
func (fs *webdavFS) OpenFile(ctx context.Context, name string, flag int, perm os.FileMode) (f webdav.File, err error) {
@@ -73,7 +76,7 @@ func (fs *webdavFS) OpenFile(ctx context.Context, name string, flag int, perm os
if writing {
fs.makeparents(name)
}
- f, err = fs.collfs.OpenFile(name, flag, perm)
+ f, err = fs.collfs.OpenFile(fs.prefix+name, flag, perm)
if !fs.writing {
// webdav module returns 404 on all OpenFile errors,
// but returns 405 Method Not Allowed if OpenFile()
@@ -93,7 +96,7 @@ func (fs *webdavFS) OpenFile(ctx context.Context, name string, flag int, perm os
}
func (fs *webdavFS) RemoveAll(ctx context.Context, name string) error {
- return fs.collfs.RemoveAll(name)
+ return fs.collfs.RemoveAll(fs.prefix + name)
}
func (fs *webdavFS) Rename(ctx context.Context, oldName, newName string) error {
@@ -106,14 +109,14 @@ func (fs *webdavFS) Rename(ctx context.Context, oldName, newName string) error {
newName = strings.TrimSuffix(newName, "/")
}
fs.makeparents(newName)
- return fs.collfs.Rename(oldName, newName)
+ return fs.collfs.Rename(fs.prefix+oldName, fs.prefix+newName)
}
func (fs *webdavFS) Stat(ctx context.Context, name string) (os.FileInfo, error) {
if fs.writing {
fs.makeparents(name)
}
- return fs.collfs.Stat(name)
+ return fs.collfs.Stat(fs.prefix + name)
}
type writeFailer struct {
diff --git a/services/keepproxy/keepproxy_test.go b/services/keepproxy/keepproxy_test.go
index 8242f5b2b..38231307b 100644
--- a/services/keepproxy/keepproxy_test.go
+++ b/services/keepproxy/keepproxy_test.go
@@ -696,7 +696,7 @@ func (s *ServerRequiredSuite) TestCollectionSharingToken(c *C) {
defer srv.Close()
hash, _, err := kc.PutB([]byte("shareddata"))
c.Check(err, IsNil)
- kc.Arvados.ApiToken = arvadostest.FooCollectionSharingToken
+ kc.Arvados.ApiToken = arvadostest.FooFileCollectionSharingToken
rdr, _, _, err := kc.Get(hash)
c.Assert(err, IsNil)
data, err := ioutil.ReadAll(rdr)
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list