[ARVADOS] created: 2.1.0-1834-gce4196cd7
Git user
git at public.arvados.org
Tue Jan 25 16:16:04 UTC 2022
at ce4196cd7f7b203396626ac09d98601c4fb22f01 (commit)
commit ce4196cd7f7b203396626ac09d98601c4fb22f01
Author: Tom Clegg <tom at curii.com>
Date: Tue Jan 25 11:15:51 2022 -0500
16727: Refresh block permission signatures on Sync and Read.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/sdk/go/arvados/blob_signature.go b/sdk/go/arvados/blob_signature.go
index 47b31a18e..9a031face 100644
--- a/sdk/go/arvados/blob_signature.go
+++ b/sdk/go/arvados/blob_signature.go
@@ -16,6 +16,7 @@ import (
"fmt"
"regexp"
"strconv"
+ "strings"
"time"
)
@@ -126,3 +127,21 @@ func parseHexTimestamp(timestampHex string) (ts time.Time, err error) {
}
return ts, err
}
+
+var errNoSignature = errors.New("locator has no signature")
+
+func signatureExpiryTime(signedLocator string) (time.Time, error) {
+ matches := SignedLocatorRe.FindStringSubmatch(signedLocator)
+ if matches == nil {
+ return time.Time{}, errNoSignature
+ }
+ expiryHex := matches[7]
+ return parseHexTimestamp(expiryHex)
+}
+
+func stripAllHints(locator string) string {
+ if i := strings.IndexRune(locator, '+'); i > 0 {
+ return locator[:i]
+ }
+ return locator
+}
diff --git a/sdk/go/arvados/fs_collection.go b/sdk/go/arvados/fs_collection.go
index 2b5df76ad..d39805f3f 100644
--- a/sdk/go/arvados/fs_collection.go
+++ b/sdk/go/arvados/fs_collection.go
@@ -43,9 +43,13 @@ type CollectionFileSystem interface {
type collectionFileSystem struct {
fileSystem
- uuid string
- replicas int
- storageClasses []string
+ uuid string
+ savedPDH atomic.Value
+ replicas int
+ storageClasses []string
+ guessSignatureTTL time.Duration
+ holdCheckChanges time.Time
+ lockCheckChanges sync.Mutex
}
// FileSystem returns a CollectionFileSystem for the collection.
@@ -62,6 +66,7 @@ func (c *Collection) FileSystem(client apiClient, kc keepClient) (CollectionFile
thr: newThrottle(concurrentWriters),
},
}
+ fs.savedPDH.Store(c.PortableDataHash)
if r := c.ReplicationDesired; r != nil {
fs.replicas = *r
}
@@ -85,18 +90,109 @@ func (c *Collection) FileSystem(client apiClient, kc keepClient) (CollectionFile
return fs, nil
}
-func backdateTree(n inode, modTime time.Time) {
+// caller must have lock (or guarantee no concurrent accesses somehow)
+func eachNode(n inode, ffunc func(*filenode), dfunc func(*dirnode)) {
switch n := n.(type) {
case *filenode:
- n.fileinfo.modTime = modTime
+ if ffunc != nil {
+ ffunc(n)
+ }
case *dirnode:
- n.fileinfo.modTime = modTime
+ if dfunc != nil {
+ dfunc(n)
+ }
for _, n := range n.inodes {
- backdateTree(n, modTime)
+ eachNode(n, ffunc, dfunc)
}
}
}
+// caller must have lock (or guarantee no concurrent accesses somehow)
+func backdateTree(n inode, modTime time.Time) {
+ eachNode(n, func(fn *filenode) {
+ fn.fileinfo.modTime = modTime
+ }, func(dn *dirnode) {
+ dn.fileinfo.modTime = modTime
+ })
+}
+
+// Approximate portion of signature TTL remaining, usually between 0
+// and 1, or negative if some signatures have expired.
+func (fs *collectionFileSystem) signatureTimeLeft() (float64, time.Duration) {
+ var (
+ now = time.Now()
+ earliest = now.Add(time.Hour * 24 * 7 * 365)
+ latest time.Time
+ )
+ fs.fileSystem.root.RLock()
+ eachNode(fs.root, func(fn *filenode) {
+ fn.Lock()
+ defer fn.Unlock()
+ for _, seg := range fn.segments {
+ seg, ok := seg.(storedSegment)
+ if !ok {
+ continue
+ }
+ expiryTime, err := signatureExpiryTime(seg.locator)
+ if err != nil {
+ continue
+ }
+ if expiryTime.Before(earliest) {
+ earliest = expiryTime
+ }
+ if expiryTime.After(latest) {
+ latest = expiryTime
+ }
+ }
+ }, nil)
+ fs.fileSystem.root.RUnlock()
+
+ if latest.IsZero() {
+ // No signatures == 100% of TTL remaining.
+ return 1, 1
+ }
+
+ ttl := latest.Sub(now)
+ fs.fileSystem.root.Lock()
+ {
+ if ttl > fs.guessSignatureTTL {
+ fs.guessSignatureTTL = ttl
+ } else {
+ ttl = fs.guessSignatureTTL
+ }
+ }
+ fs.fileSystem.root.Unlock()
+
+ return earliest.Sub(now).Seconds() / ttl.Seconds(), ttl
+}
+
+func (fs *collectionFileSystem) updateSignatures(newmanifest string) {
+ newLoc := map[string]string{}
+ for _, tok := range regexp.MustCompile(`\S+`).FindAllString(newmanifest, -1) {
+ if mBlkRe.MatchString(tok) {
+ newLoc[stripAllHints(tok)] = tok
+ }
+ }
+ fs.fileSystem.root.Lock()
+ defer fs.fileSystem.root.Unlock()
+ eachNode(fs.root, func(fn *filenode) {
+ fn.Lock()
+ defer fn.Unlock()
+ for idx, seg := range fn.segments {
+ seg, ok := seg.(storedSegment)
+ if !ok {
+ continue
+ }
+ loc, ok := newLoc[stripAllHints(seg.locator)]
+ if !ok {
+ continue
+ }
+ seg.locator = loc
+ fn.segments[idx] = seg
+ }
+ }, nil)
+}
+
func (fs *collectionFileSystem) newNode(name string, perm os.FileMode, modTime time.Time) (node inode, err error) {
if name == "" || name == "." || name == ".." {
return nil, ErrInvalidArgument
@@ -180,7 +276,88 @@ func (fs *collectionFileSystem) Truncate(int64) error {
return ErrInvalidOperation
}
+// Check for and incorporate upstream changes -- unless that has
+// already been done recently, in which case this func is a no-op.
+func (fs *collectionFileSystem) checkChangesOnServer() error {
+ if fs.uuid == "" && fs.savedPDH.Load() == "" {
+ return nil
+ }
+
+ // First try UUID if any, then last known PDH. Stop if all
+ // signatures are new enough.
+ checkingAll := false
+ for _, id := range []string{fs.uuid, fs.savedPDH.Load().(string)} {
+ if id == "" {
+ continue
+ }
+
+ fs.lockCheckChanges.Lock()
+ if !checkingAll && fs.holdCheckChanges.After(time.Now()) {
+ fs.lockCheckChanges.Unlock()
+ return nil
+ }
+ remain, ttl := fs.signatureTimeLeft()
+ if remain > 0.01 && !checkingAll {
+ fs.holdCheckChanges = time.Now().Add(ttl / 100)
+ }
+ fs.lockCheckChanges.Unlock()
+
+ if remain >= 0.5 {
+ break
+ }
+ checkingAll = true
+ var coll Collection
+ err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+id, nil, map[string]interface{}{"select": []string{"portable_data_hash", "manifest_text"}})
+ if err != nil {
+ continue
+ }
+ fs.updateSignatures(coll.ManifestText)
+ }
+ return nil
+}
+
+// Refresh signature on a single locator, if necessary. Assume caller
+// has lock. If an update is needed, and there are any storedSegments
+// whose signatures can be updated, start a background task to update
+// them asynchronously when the caller releases locks.
+func (fs *collectionFileSystem) refreshSignature(locator string) string {
+ exp, err := signatureExpiryTime(locator)
+ if err != nil || exp.Sub(time.Now()) > time.Minute {
+ // Synchronous update is not needed. Start an
+ // asynchronous update if needed.
+ go fs.checkChangesOnServer()
+ return locator
+ }
+ var manifests string
+ for _, id := range []string{fs.uuid, fs.savedPDH.Load().(string)} {
+ if id == "" {
+ continue
+ }
+ var coll Collection
+ err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+id, nil, map[string]interface{}{"select": []string{"portable_data_hash", "manifest_text"}})
+ if err != nil {
+ continue
+ }
+ manifests += coll.ManifestText
+ }
+ hash := stripAllHints(locator)
+ for _, tok := range regexp.MustCompile(`\S+`).FindAllString(manifests, -1) {
+ if mBlkRe.MatchString(tok) {
+ if stripAllHints(tok) == hash {
+ locator = tok
+ break
+ }
+ }
+ }
+ go fs.updateSignatures(manifests)
+ return locator
+}
+
func (fs *collectionFileSystem) Sync() error {
+ err := fs.checkChangesOnServer()
+ if err != nil {
+ return err
+ }
if fs.uuid == "" {
return nil
}
@@ -188,19 +365,34 @@ func (fs *collectionFileSystem) Sync() error {
if err != nil {
return fmt.Errorf("sync failed: %s", err)
}
- coll := &Collection{
+ if PortableDataHash(txt) == fs.savedPDH.Load() {
+ // No local changes since last save or initial load.
+ return nil
+ }
+ coll := Collection{
UUID: fs.uuid,
ManifestText: txt,
}
- err = fs.RequestAndDecode(nil, "PUT", "arvados/v1/collections/"+fs.uuid, nil, map[string]interface{}{
+
+ selectFields := []string{"uuid", "portable_data_hash"}
+ fs.lockCheckChanges.Lock()
+ remain, _ := fs.signatureTimeLeft()
+ fs.lockCheckChanges.Unlock()
+ if remain < 0.5 {
+ selectFields = append(selectFields, "manifest_text")
+ }
+
+ err = fs.RequestAndDecode(&coll, "PUT", "arvados/v1/collections/"+fs.uuid, nil, map[string]interface{}{
"collection": map[string]string{
"manifest_text": coll.ManifestText,
},
- "select": []string{"uuid"},
+ "select": selectFields,
})
if err != nil {
return fmt.Errorf("sync failed: update %s: %s", fs.uuid, err)
}
+ fs.updateSignatures(coll.ManifestText)
+ fs.savedPDH.Store(coll.PortableDataHash)
return nil
}
@@ -375,6 +567,10 @@ func (fn *filenode) Read(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr
err = io.EOF
return
}
+ if ss, ok := fn.segments[ptr.segmentIdx].(storedSegment); ok {
+ ss.locator = fn.fs.refreshSignature(ss.locator)
+ fn.segments[ptr.segmentIdx] = ss
+ }
n, err = fn.segments[ptr.segmentIdx].ReadAt(p, int64(ptr.segmentOff))
if n > 0 {
ptr.off += int64(n)
diff --git a/sdk/go/arvados/fs_collection_test.go b/sdk/go/arvados/fs_collection_test.go
index beb4d61fc..fab91d1f7 100644
--- a/sdk/go/arvados/fs_collection_test.go
+++ b/sdk/go/arvados/fs_collection_test.go
@@ -32,6 +32,7 @@ var _ = check.Suite(&CollectionFSSuite{})
type keepClientStub struct {
blocks map[string][]byte
refreshable map[string]bool
+ reads []string // locators from ReadAt() calls
onWrite func(bufcopy []byte) // called from WriteBlock, before acquiring lock
authToken string // client's auth token (used for signing locators)
sigkey string // blob signing key
@@ -42,8 +43,14 @@ type keepClientStub struct {
var errStub404 = errors.New("404 block not found")
func (kcs *keepClientStub) ReadAt(locator string, p []byte, off int) (int, error) {
+ kcs.Lock()
+ kcs.reads = append(kcs.reads, locator)
+ kcs.Unlock()
kcs.RLock()
defer kcs.RUnlock()
+ if err := VerifySignature(locator, kcs.authToken, kcs.sigttl, []byte(kcs.sigkey)); err != nil {
+ return 0, err
+ }
buf := kcs.blocks[locator[:32]]
if buf == nil {
return 0, errStub404
@@ -102,6 +109,7 @@ type CollectionFSSuite struct {
func (s *CollectionFSSuite) SetUpTest(c *check.C) {
s.client = NewClientFromEnv()
+ s.client.AuthToken = fixtureActiveToken
err := s.client.RequestAndDecode(&s.coll, "GET", "arvados/v1/collections/"+fixtureFooAndBarFilesInDirUUID, nil, nil)
c.Assert(err, check.IsNil)
s.kc = &keepClientStub{
@@ -1433,6 +1441,103 @@ func (s *CollectionFSSuite) TestEdgeCaseManifests(c *check.C) {
}
}
+func (s *CollectionFSSuite) TestRefreshSignatures(c *check.C) {
+ filedata1 := "hello refresh signatures world\n"
+ fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+ c.Assert(err, check.IsNil)
+ fs.Mkdir("d1", 0700)
+ f, err := fs.OpenFile("d1/file1", os.O_CREATE|os.O_RDWR, 0700)
+ c.Assert(err, check.IsNil)
+ _, err = f.Write([]byte(filedata1))
+ c.Assert(err, check.IsNil)
+ err = f.Close()
+ c.Assert(err, check.IsNil)
+
+ filedata2 := "hello refresh signatures universe\n"
+ fs.Mkdir("d2", 0700)
+ f, err = fs.OpenFile("d2/file2", os.O_CREATE|os.O_RDWR, 0700)
+ c.Assert(err, check.IsNil)
+ _, err = f.Write([]byte(filedata2))
+ c.Assert(err, check.IsNil)
+ err = f.Close()
+ c.Assert(err, check.IsNil)
+ txt, err := fs.MarshalManifest(".")
+ c.Assert(err, check.IsNil)
+ var saved Collection
+ err = s.client.RequestAndDecode(&saved, "POST", "arvados/v1/collections", nil, map[string]interface{}{
+ "select": []string{"manifest_text", "uuid", "portable_data_hash"},
+ "collection": map[string]interface{}{
+ "manifest_text": txt,
+ },
+ })
+ c.Assert(err, check.IsNil)
+
+ // Update signatures synchronously if they are already expired
+ // when Read() is called.
+ {
+ saved.ManifestText = SignManifest(saved.ManifestText, s.kc.authToken, time.Now().Add(-2*time.Second), s.kc.sigttl, []byte(s.kc.sigkey))
+ fs, err := saved.FileSystem(s.client, s.kc)
+ c.Assert(err, check.IsNil)
+ f, err := fs.OpenFile("d1/file1", os.O_RDONLY, 0)
+ c.Assert(err, check.IsNil)
+ buf, err := ioutil.ReadAll(f)
+ c.Check(err, check.IsNil)
+ c.Check(string(buf), check.Equals, filedata1)
+ }
+
+ // Update signatures asynchronously if we're more than half
+ // way to TTL when Read() is called.
+ {
+ exp := time.Now().Add(2 * time.Minute)
+ saved.ManifestText = SignManifest(saved.ManifestText, s.kc.authToken, exp, s.kc.sigttl, []byte(s.kc.sigkey))
+ fs, err := saved.FileSystem(s.client, s.kc)
+ c.Assert(err, check.IsNil)
+ f1, err := fs.OpenFile("d1/file1", os.O_RDONLY, 0)
+ c.Assert(err, check.IsNil)
+ f2, err := fs.OpenFile("d2/file2", os.O_RDONLY, 0)
+ c.Assert(err, check.IsNil)
+ buf, err := ioutil.ReadAll(f1)
+ c.Check(err, check.IsNil)
+ c.Check(string(buf), check.Equals, filedata1)
+
+ // Ensure fs treats the 2-minute TTL as less than half
+ // the server's signing TTL. If we don't do this,
+ // collectionfs will guess the signature is fresh,
+ // i.e., signing TTL is 2 minutes, and won't do an
+ // async refresh.
+ fs.(*collectionFileSystem).guessSignatureTTL = time.Hour
+
+ refreshed := false
+ for deadline := time.Now().Add(time.Second * 10); time.Now().Before(deadline) && !refreshed; time.Sleep(time.Second / 10) {
+ _, err = f1.Seek(0, io.SeekStart)
+ c.Assert(err, check.IsNil)
+ buf, err = ioutil.ReadAll(f1)
+ c.Assert(err, check.IsNil)
+ c.Assert(string(buf), check.Equals, filedata1)
+ loc := s.kc.reads[len(s.kc.reads)-1]
+ t, err := signatureExpiryTime(loc)
+ c.Assert(err, check.IsNil)
+ c.Logf("last read block %s had signature expiry time %v", loc, t)
+ if t.Sub(time.Now()) > time.Hour {
+ refreshed = true
+ }
+ }
+ c.Check(refreshed, check.Equals, true)
+
+ // Second locator should have been updated at the same
+ // time.
+ buf, err = ioutil.ReadAll(f2)
+ c.Assert(err, check.IsNil)
+ c.Assert(string(buf), check.Equals, filedata2)
+ loc := s.kc.reads[len(s.kc.reads)-1]
+ c.Check(loc, check.Not(check.Equals), s.kc.reads[len(s.kc.reads)-2])
+ t, err := signatureExpiryTime(s.kc.reads[len(s.kc.reads)-1])
+ c.Assert(err, check.IsNil)
+ c.Logf("last read block %s had signature expiry time %v", loc, t)
+ c.Check(t.Sub(time.Now()) > time.Hour, check.Equals, true)
+ }
+}
+
var bigmanifest = func() string {
var buf bytes.Buffer
for i := 0; i < 2000; i++ {
diff --git a/sdk/go/arvados/fs_project_test.go b/sdk/go/arvados/fs_project_test.go
index f68e7c8b0..894351327 100644
--- a/sdk/go/arvados/fs_project_test.go
+++ b/sdk/go/arvados/fs_project_test.go
@@ -295,6 +295,11 @@ func (s *SiteFSSuite) TestProjectUpdatedByOther(c *check.C) {
err = s.client.RequestAndDecode(nil, "DELETE", "arvados/v1/collections/"+oob.UUID, nil, nil)
c.Assert(err, check.IsNil)
+ wf, err = s.fs.OpenFile("/home/A Project/oob/test.txt", os.O_CREATE|os.O_RDWR, 0700)
+ c.Assert(err, check.IsNil)
+ err = wf.Close()
+ c.Check(err, check.IsNil)
+
err = project.Sync()
c.Check(err, check.NotNil) // can't update the deleted collection
_, err = s.fs.Open("/home/A Project/oob")
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list