[ARVADOS] created: 2.1.0-1834-gce4196cd7

Git user git at public.arvados.org
Tue Jan 25 16:16:04 UTC 2022


        at  ce4196cd7f7b203396626ac09d98601c4fb22f01 (commit)


commit ce4196cd7f7b203396626ac09d98601c4fb22f01
Author: Tom Clegg <tom at curii.com>
Date:   Tue Jan 25 11:15:51 2022 -0500

    16727: Refresh block permission signatures on Sync and Read.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/sdk/go/arvados/blob_signature.go b/sdk/go/arvados/blob_signature.go
index 47b31a18e..9a031face 100644
--- a/sdk/go/arvados/blob_signature.go
+++ b/sdk/go/arvados/blob_signature.go
@@ -16,6 +16,7 @@ import (
 	"fmt"
 	"regexp"
 	"strconv"
+	"strings"
 	"time"
 )
 
@@ -126,3 +127,21 @@ func parseHexTimestamp(timestampHex string) (ts time.Time, err error) {
 	}
 	return ts, err
 }
+
+var errNoSignature = errors.New("locator has no signature")
+
+func signatureExpiryTime(signedLocator string) (time.Time, error) {
+	matches := SignedLocatorRe.FindStringSubmatch(signedLocator)
+	if matches == nil {
+		return time.Time{}, errNoSignature
+	}
+	expiryHex := matches[7]
+	return parseHexTimestamp(expiryHex)
+}
+
+func stripAllHints(locator string) string {
+	if i := strings.IndexRune(locator, '+'); i > 0 {
+		return locator[:i]
+	}
+	return locator
+}
diff --git a/sdk/go/arvados/fs_collection.go b/sdk/go/arvados/fs_collection.go
index 2b5df76ad..d39805f3f 100644
--- a/sdk/go/arvados/fs_collection.go
+++ b/sdk/go/arvados/fs_collection.go
@@ -43,9 +43,13 @@ type CollectionFileSystem interface {
 
 type collectionFileSystem struct {
 	fileSystem
-	uuid           string
-	replicas       int
-	storageClasses []string
+	uuid              string
+	savedPDH          atomic.Value
+	replicas          int
+	storageClasses    []string
+	guessSignatureTTL time.Duration
+	holdCheckChanges  time.Time
+	lockCheckChanges  sync.Mutex
 }
 
 // FileSystem returns a CollectionFileSystem for the collection.
@@ -62,6 +66,7 @@ func (c *Collection) FileSystem(client apiClient, kc keepClient) (CollectionFile
 			thr:       newThrottle(concurrentWriters),
 		},
 	}
+	fs.savedPDH.Store(c.PortableDataHash)
 	if r := c.ReplicationDesired; r != nil {
 		fs.replicas = *r
 	}
@@ -85,18 +90,109 @@ func (c *Collection) FileSystem(client apiClient, kc keepClient) (CollectionFile
 	return fs, nil
 }
 
-func backdateTree(n inode, modTime time.Time) {
+// caller must have lock (or guarantee no concurrent accesses somehow)
+func eachNode(n inode, ffunc func(*filenode), dfunc func(*dirnode)) {
 	switch n := n.(type) {
 	case *filenode:
-		n.fileinfo.modTime = modTime
+		if ffunc != nil {
+			ffunc(n)
+		}
 	case *dirnode:
-		n.fileinfo.modTime = modTime
+		if dfunc != nil {
+			dfunc(n)
+		}
 		for _, n := range n.inodes {
-			backdateTree(n, modTime)
+			eachNode(n, ffunc, dfunc)
 		}
 	}
 }
 
+// caller must have lock (or guarantee no concurrent accesses somehow)
+func backdateTree(n inode, modTime time.Time) {
+	eachNode(n, func(fn *filenode) {
+		fn.fileinfo.modTime = modTime
+	}, func(dn *dirnode) {
+		dn.fileinfo.modTime = modTime
+	})
+}
+
+// Approximate portion of signature TTL remaining, usually between 0
+// and 1, or negative if some signatures have expired.
+func (fs *collectionFileSystem) signatureTimeLeft() (float64, time.Duration) {
+	var (
+		now      = time.Now()
+		earliest = now.Add(time.Hour * 24 * 7 * 365)
+		latest   time.Time
+	)
+	fs.fileSystem.root.RLock()
+	eachNode(fs.root, func(fn *filenode) {
+		fn.Lock()
+		defer fn.Unlock()
+		for _, seg := range fn.segments {
+			seg, ok := seg.(storedSegment)
+			if !ok {
+				continue
+			}
+			expiryTime, err := signatureExpiryTime(seg.locator)
+			if err != nil {
+				continue
+			}
+			if expiryTime.Before(earliest) {
+				earliest = expiryTime
+			}
+			if expiryTime.After(latest) {
+				latest = expiryTime
+			}
+		}
+	}, nil)
+	fs.fileSystem.root.RUnlock()
+
+	if latest.IsZero() {
+		// No signatures == 100% of TTL remaining.
+		return 1, 1
+	}
+
+	ttl := latest.Sub(now)
+	fs.fileSystem.root.Lock()
+	{
+		if ttl > fs.guessSignatureTTL {
+			fs.guessSignatureTTL = ttl
+		} else {
+			ttl = fs.guessSignatureTTL
+		}
+	}
+	fs.fileSystem.root.Unlock()
+
+	return earliest.Sub(now).Seconds() / ttl.Seconds(), ttl
+}
+
+func (fs *collectionFileSystem) updateSignatures(newmanifest string) {
+	newLoc := map[string]string{}
+	for _, tok := range regexp.MustCompile(`\S+`).FindAllString(newmanifest, -1) {
+		if mBlkRe.MatchString(tok) {
+			newLoc[stripAllHints(tok)] = tok
+		}
+	}
+	fs.fileSystem.root.Lock()
+	defer fs.fileSystem.root.Unlock()
+	eachNode(fs.root, func(fn *filenode) {
+		fn.Lock()
+		defer fn.Unlock()
+		for idx, seg := range fn.segments {
+			seg, ok := seg.(storedSegment)
+			if !ok {
+				continue
+			}
+			loc, ok := newLoc[stripAllHints(seg.locator)]
+			if !ok {
+				continue
+			}
+			seg.locator = loc
+			fn.segments[idx] = seg
+		}
+	}, nil)
+}
+
 func (fs *collectionFileSystem) newNode(name string, perm os.FileMode, modTime time.Time) (node inode, err error) {
 	if name == "" || name == "." || name == ".." {
 		return nil, ErrInvalidArgument
@@ -180,7 +276,88 @@ func (fs *collectionFileSystem) Truncate(int64) error {
 	return ErrInvalidOperation
 }
 
+// Check for and incorporate upstream changes -- unless that has
+// already been done recently, in which case this func is a no-op.
+func (fs *collectionFileSystem) checkChangesOnServer() error {
+	if fs.uuid == "" && fs.savedPDH.Load() == "" {
+		return nil
+	}
+
+	// First try UUID if any, then last known PDH. Stop if all
+	// signatures are new enough.
+	checkingAll := false
+	for _, id := range []string{fs.uuid, fs.savedPDH.Load().(string)} {
+		if id == "" {
+			continue
+		}
+
+		fs.lockCheckChanges.Lock()
+		if !checkingAll && fs.holdCheckChanges.After(time.Now()) {
+			fs.lockCheckChanges.Unlock()
+			return nil
+		}
+		remain, ttl := fs.signatureTimeLeft()
+		if remain > 0.01 && !checkingAll {
+			fs.holdCheckChanges = time.Now().Add(ttl / 100)
+		}
+		fs.lockCheckChanges.Unlock()
+
+		if remain >= 0.5 {
+			break
+		}
+		checkingAll = true
+		var coll Collection
+		err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+id, nil, map[string]interface{}{"select": []string{"portable_data_hash", "manifest_text"}})
+		if err != nil {
+			continue
+		}
+		fs.updateSignatures(coll.ManifestText)
+	}
+	return nil
+}
+
+// Refresh signature on a single locator, if necessary. Assume caller
+// has lock. If an update is needed, and there are any storedSegments
+// whose signatures can be updated, start a background task to update
+// them asynchronously when the caller releases locks.
+func (fs *collectionFileSystem) refreshSignature(locator string) string {
+	exp, err := signatureExpiryTime(locator)
+	if err != nil || exp.Sub(time.Now()) > time.Minute {
+		// Synchronous update is not needed. Start an
+		// asynchronous update if needed.
+		go fs.checkChangesOnServer()
+		return locator
+	}
+	var manifests string
+	for _, id := range []string{fs.uuid, fs.savedPDH.Load().(string)} {
+		if id == "" {
+			continue
+		}
+		var coll Collection
+		err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+id, nil, map[string]interface{}{"select": []string{"portable_data_hash", "manifest_text"}})
+		if err != nil {
+			continue
+		}
+		manifests += coll.ManifestText
+	}
+	hash := stripAllHints(locator)
+	for _, tok := range regexp.MustCompile(`\S+`).FindAllString(manifests, -1) {
+		if mBlkRe.MatchString(tok) {
+			if stripAllHints(tok) == hash {
+				locator = tok
+				break
+			}
+		}
+	}
+	go fs.updateSignatures(manifests)
+	return locator
+}
+
 func (fs *collectionFileSystem) Sync() error {
+	err := fs.checkChangesOnServer()
+	if err != nil {
+		return err
+	}
 	if fs.uuid == "" {
 		return nil
 	}
@@ -188,19 +365,34 @@ func (fs *collectionFileSystem) Sync() error {
 	if err != nil {
 		return fmt.Errorf("sync failed: %s", err)
 	}
-	coll := &Collection{
+	if PortableDataHash(txt) == fs.savedPDH.Load() {
+		// No local changes since last save or initial load.
+		return nil
+	}
+	coll := Collection{
 		UUID:         fs.uuid,
 		ManifestText: txt,
 	}
-	err = fs.RequestAndDecode(nil, "PUT", "arvados/v1/collections/"+fs.uuid, nil, map[string]interface{}{
+
+	selectFields := []string{"uuid", "portable_data_hash"}
+	fs.lockCheckChanges.Lock()
+	remain, _ := fs.signatureTimeLeft()
+	fs.lockCheckChanges.Unlock()
+	if remain < 0.5 {
+		selectFields = append(selectFields, "manifest_text")
+	}
+
+	err = fs.RequestAndDecode(&coll, "PUT", "arvados/v1/collections/"+fs.uuid, nil, map[string]interface{}{
 		"collection": map[string]string{
 			"manifest_text": coll.ManifestText,
 		},
-		"select": []string{"uuid"},
+		"select": selectFields,
 	})
 	if err != nil {
 		return fmt.Errorf("sync failed: update %s: %s", fs.uuid, err)
 	}
+	fs.updateSignatures(coll.ManifestText)
+	fs.savedPDH.Store(coll.PortableDataHash)
 	return nil
 }
 
@@ -375,6 +567,10 @@ func (fn *filenode) Read(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr
 		err = io.EOF
 		return
 	}
+	if ss, ok := fn.segments[ptr.segmentIdx].(storedSegment); ok {
+		ss.locator = fn.fs.refreshSignature(ss.locator)
+		fn.segments[ptr.segmentIdx] = ss
+	}
 	n, err = fn.segments[ptr.segmentIdx].ReadAt(p, int64(ptr.segmentOff))
 	if n > 0 {
 		ptr.off += int64(n)
diff --git a/sdk/go/arvados/fs_collection_test.go b/sdk/go/arvados/fs_collection_test.go
index beb4d61fc..fab91d1f7 100644
--- a/sdk/go/arvados/fs_collection_test.go
+++ b/sdk/go/arvados/fs_collection_test.go
@@ -32,6 +32,7 @@ var _ = check.Suite(&CollectionFSSuite{})
 type keepClientStub struct {
 	blocks      map[string][]byte
 	refreshable map[string]bool
+	reads       []string             // locators from ReadAt() calls
 	onWrite     func(bufcopy []byte) // called from WriteBlock, before acquiring lock
 	authToken   string               // client's auth token (used for signing locators)
 	sigkey      string               // blob signing key
@@ -42,8 +43,14 @@ type keepClientStub struct {
 var errStub404 = errors.New("404 block not found")
 
 func (kcs *keepClientStub) ReadAt(locator string, p []byte, off int) (int, error) {
+	kcs.Lock()
+	kcs.reads = append(kcs.reads, locator)
+	kcs.Unlock()
 	kcs.RLock()
 	defer kcs.RUnlock()
+	if err := VerifySignature(locator, kcs.authToken, kcs.sigttl, []byte(kcs.sigkey)); err != nil {
+		return 0, err
+	}
 	buf := kcs.blocks[locator[:32]]
 	if buf == nil {
 		return 0, errStub404
@@ -102,6 +109,7 @@ type CollectionFSSuite struct {
 
 func (s *CollectionFSSuite) SetUpTest(c *check.C) {
 	s.client = NewClientFromEnv()
+	s.client.AuthToken = fixtureActiveToken
 	err := s.client.RequestAndDecode(&s.coll, "GET", "arvados/v1/collections/"+fixtureFooAndBarFilesInDirUUID, nil, nil)
 	c.Assert(err, check.IsNil)
 	s.kc = &keepClientStub{
@@ -1433,6 +1441,103 @@ func (s *CollectionFSSuite) TestEdgeCaseManifests(c *check.C) {
 	}
 }
 
+func (s *CollectionFSSuite) TestRefreshSignatures(c *check.C) {
+	filedata1 := "hello refresh signatures world\n"
+	fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+	c.Assert(err, check.IsNil)
+	fs.Mkdir("d1", 0700)
+	f, err := fs.OpenFile("d1/file1", os.O_CREATE|os.O_RDWR, 0700)
+	c.Assert(err, check.IsNil)
+	_, err = f.Write([]byte(filedata1))
+	c.Assert(err, check.IsNil)
+	err = f.Close()
+	c.Assert(err, check.IsNil)
+
+	filedata2 := "hello refresh signatures universe\n"
+	fs.Mkdir("d2", 0700)
+	f, err = fs.OpenFile("d2/file2", os.O_CREATE|os.O_RDWR, 0700)
+	c.Assert(err, check.IsNil)
+	_, err = f.Write([]byte(filedata2))
+	c.Assert(err, check.IsNil)
+	err = f.Close()
+	c.Assert(err, check.IsNil)
+	txt, err := fs.MarshalManifest(".")
+	c.Assert(err, check.IsNil)
+	var saved Collection
+	err = s.client.RequestAndDecode(&saved, "POST", "arvados/v1/collections", nil, map[string]interface{}{
+		"select": []string{"manifest_text", "uuid", "portable_data_hash"},
+		"collection": map[string]interface{}{
+			"manifest_text": txt,
+		},
+	})
+	c.Assert(err, check.IsNil)
+
+	// Update signatures synchronously if they are already expired
+	// when Read() is called.
+	{
+		saved.ManifestText = SignManifest(saved.ManifestText, s.kc.authToken, time.Now().Add(-2*time.Second), s.kc.sigttl, []byte(s.kc.sigkey))
+		fs, err := saved.FileSystem(s.client, s.kc)
+		c.Assert(err, check.IsNil)
+		f, err := fs.OpenFile("d1/file1", os.O_RDONLY, 0)
+		c.Assert(err, check.IsNil)
+		buf, err := ioutil.ReadAll(f)
+		c.Check(err, check.IsNil)
+		c.Check(string(buf), check.Equals, filedata1)
+	}
+
+	// Update signatures asynchronously if we're more than half
+	// way to TTL when Read() is called.
+	{
+		exp := time.Now().Add(2 * time.Minute)
+		saved.ManifestText = SignManifest(saved.ManifestText, s.kc.authToken, exp, s.kc.sigttl, []byte(s.kc.sigkey))
+		fs, err := saved.FileSystem(s.client, s.kc)
+		c.Assert(err, check.IsNil)
+		f1, err := fs.OpenFile("d1/file1", os.O_RDONLY, 0)
+		c.Assert(err, check.IsNil)
+		f2, err := fs.OpenFile("d2/file2", os.O_RDONLY, 0)
+		c.Assert(err, check.IsNil)
+		buf, err := ioutil.ReadAll(f1)
+		c.Check(err, check.IsNil)
+		c.Check(string(buf), check.Equals, filedata1)
+
+		// Ensure fs treats the 2-minute TTL as less than half
+		// the server's signing TTL. If we don't do this,
+		// collectionfs will guess the signature is fresh,
+		// i.e., signing TTL is 2 minutes, and won't do an
+		// async refresh.
+		fs.(*collectionFileSystem).guessSignatureTTL = time.Hour
+
+		refreshed := false
+		for deadline := time.Now().Add(time.Second * 10); time.Now().Before(deadline) && !refreshed; time.Sleep(time.Second / 10) {
+			_, err = f1.Seek(0, io.SeekStart)
+			c.Assert(err, check.IsNil)
+			buf, err = ioutil.ReadAll(f1)
+			c.Assert(err, check.IsNil)
+			c.Assert(string(buf), check.Equals, filedata1)
+			loc := s.kc.reads[len(s.kc.reads)-1]
+			t, err := signatureExpiryTime(loc)
+			c.Assert(err, check.IsNil)
+			c.Logf("last read block %s had signature expiry time %v", loc, t)
+			if t.Sub(time.Now()) > time.Hour {
+				refreshed = true
+			}
+		}
+		c.Check(refreshed, check.Equals, true)
+
+		// Second locator should have been updated at the same
+		// time.
+		buf, err = ioutil.ReadAll(f2)
+		c.Assert(err, check.IsNil)
+		c.Assert(string(buf), check.Equals, filedata2)
+		loc := s.kc.reads[len(s.kc.reads)-1]
+		c.Check(loc, check.Not(check.Equals), s.kc.reads[len(s.kc.reads)-2])
+		t, err := signatureExpiryTime(s.kc.reads[len(s.kc.reads)-1])
+		c.Assert(err, check.IsNil)
+		c.Logf("last read block %s had signature expiry time %v", loc, t)
+		c.Check(t.Sub(time.Now()) > time.Hour, check.Equals, true)
+	}
+}
+
 var bigmanifest = func() string {
 	var buf bytes.Buffer
 	for i := 0; i < 2000; i++ {
diff --git a/sdk/go/arvados/fs_project_test.go b/sdk/go/arvados/fs_project_test.go
index f68e7c8b0..894351327 100644
--- a/sdk/go/arvados/fs_project_test.go
+++ b/sdk/go/arvados/fs_project_test.go
@@ -295,6 +295,11 @@ func (s *SiteFSSuite) TestProjectUpdatedByOther(c *check.C) {
 	err = s.client.RequestAndDecode(nil, "DELETE", "arvados/v1/collections/"+oob.UUID, nil, nil)
 	c.Assert(err, check.IsNil)
 
+	wf, err = s.fs.OpenFile("/home/A Project/oob/test.txt", os.O_CREATE|os.O_RDWR, 0700)
+	c.Assert(err, check.IsNil)
+	err = wf.Close()
+	c.Check(err, check.IsNil)
+
 	err = project.Sync()
 	c.Check(err, check.NotNil) // can't update the deleted collection
 	_, err = s.fs.Open("/home/A Project/oob")

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list