[arvados] updated: 2.7.0-5655-g46b917d759

git repository hosting git at public.arvados.org
Tue Dec 26 20:16:36 UTC 2023


Summary of changes:
 sdk/go/arvados/keep_cache.go         | 57 ++++++++++++++++++++++++++++--------
 sdk/go/arvados/keep_cache_test.go    |  6 ++--
 sdk/go/keepclient/keepclient_test.go | 11 +++----
 sdk/go/keepclient/support.go         |  6 +++-
 4 files changed, 59 insertions(+), 21 deletions(-)

       via  46b917d75926c5bd215d3584c85822dbda877981 (commit)
       via  0c0bbd5d130e3b23d8ec6a8434f7899622d33195 (commit)
       via  6f83707fd0517e591273263bc855433dc80a5d6f (commit)
      from  84e451a572012c115801e1a72fc95ec79b4640e8 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 46b917d75926c5bd215d3584c85822dbda877981
Author: Tom Clegg <tom at curii.com>
Date:   Tue Dec 26 15:16:28 2023 -0500

    20318: Use one tidying goroutine and filehandle pool per cache dir.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/sdk/go/arvados/keep_cache.go b/sdk/go/arvados/keep_cache.go
index a657153876..1d12f4cdc6 100644
--- a/sdk/go/arvados/keep_cache.go
+++ b/sdk/go/arvados/keep_cache.go
@@ -40,6 +40,26 @@ type DiskCache struct {
 	MaxSize ByteSizeOrPercent
 	Logger  logrus.FieldLogger
 
+	*sharedCache
+	setupOnce sync.Once
+}
+
+var (
+	sharedCachesLock sync.Mutex
+	sharedCaches     = map[string]*sharedCache{}
+)
+
+// sharedCache has fields that coordinate the cache usage in a single
+// cache directory; it can be shared by multiple DiskCaches.
+//
+// This serves to share a single pool of held-open filehandles, a
+// single tidying goroutine, etc., even when the program (like
+// keep-web) uses multiple KeepGateway stacks that use different auth
+// tokens, etc.
+type sharedCache struct {
+	dir     string
+	maxSize ByteSizeOrPercent
+
 	tidying        int32 // see tidy()
 	tidyHoldUntil  time.Time
 	defaultMaxSize int64
@@ -82,12 +102,22 @@ const (
 	tidyHoldDuration = 5 * time.Minute // time to re-check cache size even if estimated size is below max
 )
 
+func (cache *DiskCache) setup() {
+	sharedCachesLock.Lock()
+	defer sharedCachesLock.Unlock()
+	dir := cache.Dir
+	if sharedCaches[dir] == nil {
+		sharedCaches[dir] = &sharedCache{dir: dir, maxSize: cache.MaxSize}
+	}
+	cache.sharedCache = sharedCaches[dir]
+}
+
 func (cache *DiskCache) cacheFile(locator string) string {
 	hash := locator
 	if i := strings.Index(hash, "+"); i > 0 {
 		hash = hash[:i]
 	}
-	return filepath.Join(cache.Dir, hash[:3], hash+cacheFileSuffix)
+	return filepath.Join(cache.dir, hash[:3], hash+cacheFileSuffix)
 }
 
 // Open a cache file, creating the parent dir if necessary.
@@ -126,8 +156,9 @@ func (cache *DiskCache) debugf(format string, args ...interface{}) {
 // BlockWrite writes through to the wrapped KeepGateway, and (if
 // possible) retains a copy of the written block in the cache.
 func (cache *DiskCache) BlockWrite(ctx context.Context, opts BlockWriteOptions) (BlockWriteResponse, error) {
+	cache.setupOnce.Do(cache.setup)
 	unique := fmt.Sprintf("%x.%p%s", os.Getpid(), &opts, tmpFileSuffix)
-	tmpfilename := filepath.Join(cache.Dir, "tmp", unique)
+	tmpfilename := filepath.Join(cache.dir, "tmp", unique)
 	tmpfile, err := cache.openFile(tmpfilename, os.O_CREATE|os.O_EXCL|os.O_RDWR)
 	if err != nil {
 		cache.debugf("BlockWrite: open(%s) failed: %s", tmpfilename, err)
@@ -235,6 +266,7 @@ func (fw funcwriter) Write(p []byte) (int, error) {
 // cache. The remainder of the block may continue to be copied into
 // the cache in the background.
 func (cache *DiskCache) ReadAt(locator string, dst []byte, offset int) (int, error) {
+	cache.setupOnce.Do(cache.setup)
 	cachefilename := cache.cacheFile(locator)
 	if n, err := cache.quickReadAt(cachefilename, dst, offset); err == nil {
 		return n, err
@@ -382,11 +414,11 @@ func (cache *DiskCache) quickReadAt(cachefilename string, dst []byte, offset int
 		lim := syscall.Rlimit{}
 		err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &lim)
 		if err != nil {
-			cache.heldopenMax = 256
-		} else if lim.Cur > 40000 {
+			cache.heldopenMax = 100
+		} else if lim.Cur > 400000 {
 			cache.heldopenMax = 10000
 		} else {
-			cache.heldopenMax = int(lim.Cur / 4)
+			cache.heldopenMax = int(lim.Cur / 40)
 		}
 	}
 	heldopen := cache.heldopen[cachefilename]
@@ -483,6 +515,7 @@ func (cache *DiskCache) quickReadAt(cachefilename string, dst []byte, offset int
 
 // BlockRead reads an entire block using a 128 KiB buffer.
 func (cache *DiskCache) BlockRead(ctx context.Context, opts BlockReadOptions) (int, error) {
+	cache.setupOnce.Do(cache.setup)
 	i := strings.Index(opts.Locator, "+")
 	if i < 0 || i >= len(opts.Locator) {
 		return 0, errors.New("invalid block locator: no size hint")
@@ -531,7 +564,7 @@ func (cache *DiskCache) gotidy() {
 		return
 	}
 	// Skip if sizeEstimated is based on an actual measurement and
-	// is below MaxSize, and we haven't reached the "recheck
+	// is below maxSize, and we haven't reached the "recheck
 	// anyway" time threshold.
 	if cache.sizeMeasured > 0 &&
 		atomic.LoadInt64(&cache.sizeEstimated) < atomic.LoadInt64(&cache.defaultMaxSize) &&
@@ -548,19 +581,19 @@ func (cache *DiskCache) gotidy() {
 
 // Delete cache files as needed to control disk usage.
 func (cache *DiskCache) tidy() {
-	maxsize := int64(cache.MaxSize.ByteSize())
+	maxsize := int64(cache.maxSize.ByteSize())
 	if maxsize < 1 {
 		maxsize = atomic.LoadInt64(&cache.defaultMaxSize)
 		if maxsize == 0 {
 			// defaultMaxSize not yet computed. Use 10% of
 			// filesystem capacity (or different
-			// percentage if indicated by cache.MaxSize)
-			pct := cache.MaxSize.Percent()
+			// percentage if indicated by cache.maxSize)
+			pct := cache.maxSize.Percent()
 			if pct == 0 {
 				pct = 10
 			}
 			var stat unix.Statfs_t
-			if nil == unix.Statfs(cache.Dir, &stat) {
+			if nil == unix.Statfs(cache.dir, &stat) {
 				maxsize = int64(stat.Bavail) * stat.Bsize * pct / 100
 				atomic.StoreInt64(&cache.defaultMaxSize, maxsize)
 			} else {
@@ -572,7 +605,7 @@ func (cache *DiskCache) tidy() {
 	}
 
 	// Bail if a tidy goroutine is running in a different process.
-	lockfile, err := cache.openFile(filepath.Join(cache.Dir, "tmp", "tidy.lock"), os.O_CREATE|os.O_WRONLY)
+	lockfile, err := cache.openFile(filepath.Join(cache.dir, "tmp", "tidy.lock"), os.O_CREATE|os.O_WRONLY)
 	if err != nil {
 		return
 	}
@@ -589,7 +622,7 @@ func (cache *DiskCache) tidy() {
 	}
 	var ents []entT
 	var totalsize int64
-	filepath.Walk(cache.Dir, func(path string, info fs.FileInfo, err error) error {
+	filepath.Walk(cache.dir, func(path string, info fs.FileInfo, err error) error {
 		if err != nil {
 			cache.debugf("tidy: skipping dir %s: %s", path, err)
 			return nil

commit 0c0bbd5d130e3b23d8ec6a8434f7899622d33195
Author: Tom Clegg <tom at curii.com>
Date:   Tue Dec 26 12:46:04 2023 -0500

    f config
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/sdk/go/arvados/keep_cache_test.go b/sdk/go/arvados/keep_cache_test.go
index e4d1790cac..ffca531cd7 100644
--- a/sdk/go/arvados/keep_cache_test.go
+++ b/sdk/go/arvados/keep_cache_test.go
@@ -201,7 +201,7 @@ func (s *keepCacheSuite) testConcurrentReaders(c *check.C, cannotRefresh, mangle
 	backend := &keepGatewayMemoryBacked{}
 	cache := DiskCache{
 		KeepGateway: backend,
-		MaxSize:     int64(blksize),
+		MaxSize:     ByteSizeOrPercent(blksize),
 		Dir:         c.MkDir(),
 		Logger:      ctxlog.TestLogger(c),
 	}
@@ -286,7 +286,7 @@ func (s *keepCacheSuite) TestStreaming(c *check.C) {
 	}
 	cache := DiskCache{
 		KeepGateway: backend,
-		MaxSize:     int64(blksize),
+		MaxSize:     ByteSizeOrPercent(blksize),
 		Dir:         c.MkDir(),
 		Logger:      ctxlog.TestLogger(c),
 	}
@@ -354,7 +354,7 @@ func (s *keepCacheBenchSuite) SetUpTest(c *check.C) {
 	s.backend = &keepGatewayMemoryBacked{}
 	s.cache = &DiskCache{
 		KeepGateway: s.backend,
-		MaxSize:     int64(s.blksize),
+		MaxSize:     ByteSizeOrPercent(s.blksize),
 		Dir:         c.MkDir(),
 		Logger:      ctxlog.TestLogger(c),
 	}

commit 6f83707fd0517e591273263bc855433dc80a5d6f
Author: Tom Clegg <tom at curii.com>
Date:   Tue Dec 26 12:25:34 2023 -0500

    20318: Fix write using Reader and unspecified Hash.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/sdk/go/keepclient/keepclient_test.go b/sdk/go/keepclient/keepclient_test.go
index a6e0a11d51..ad5d12b505 100644
--- a/sdk/go/keepclient/keepclient_test.go
+++ b/sdk/go/keepclient/keepclient_test.go
@@ -1101,19 +1101,20 @@ func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
 	}
 	{
 		hash2, replicas, err := kc.PutB(content)
+		c.Check(err, IsNil)
 		c.Check(hash2, Matches, fmt.Sprintf(`%s\+%d\b.*`, hash, len(content)))
 		c.Check(replicas, Equals, 2)
-		c.Check(err, Equals, nil)
 	}
 	{
 		r, n, url2, err := kc.Get(hash)
 		c.Check(err, Equals, nil)
 		c.Check(n, Equals, int64(len(content)))
 		c.Check(url2, Matches, fmt.Sprintf("http://localhost:\\d+/%s", hash))
-
-		readContent, err2 := ioutil.ReadAll(r)
-		c.Check(err2, Equals, nil)
-		c.Check(readContent, DeepEquals, content)
+		if c.Check(r, NotNil) {
+			readContent, err2 := ioutil.ReadAll(r)
+			c.Check(err2, Equals, nil)
+			c.Check(readContent, DeepEquals, content)
+		}
 	}
 	{
 		n, url2, err := kc.Ask(hash)
diff --git a/sdk/go/keepclient/support.go b/sdk/go/keepclient/support.go
index c4144bf871..6acaf64baa 100644
--- a/sdk/go/keepclient/support.go
+++ b/sdk/go/keepclient/support.go
@@ -149,8 +149,12 @@ func (kc *KeepClient) httpBlockWrite(ctx context.Context, req arvados.BlockWrite
 		getReader = func() io.Reader { return bytes.NewReader(req.Data[:req.DataSize]) }
 	} else {
 		buf := asyncbuf.NewBuffer(make([]byte, 0, req.DataSize))
+		reader := req.Reader
+		if req.Hash != "" {
+			reader = HashCheckingReader{req.Reader, md5.New(), req.Hash}
+		}
 		go func() {
-			_, err := io.Copy(buf, HashCheckingReader{req.Reader, md5.New(), req.Hash})
+			_, err := io.Copy(buf, reader)
 			buf.CloseWithError(err)
 		}()
 		getReader = buf.NewReader

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list