[ARVADOS] updated: 1.3.0-1718-g97e783760

Git user git at public.curoverse.com
Tue Oct 22 15:06:21 UTC 2019


Summary of changes:
 sdk/go/arvados/fs_base.go            |  8 ++++++++
 sdk/go/arvados/fs_collection.go      | 31 +++++++++++++------------------
 sdk/go/arvados/fs_collection_test.go | 10 +++++-----
 sdk/go/arvados/fs_site.go            |  2 ++
 4 files changed, 28 insertions(+), 23 deletions(-)

       via  97e783760423f6f0200b057629aa9cd593a5c9c7 (commit)
      from  5e109ae2b94f5dcaddb19da360507f73078e2d1c (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 97e783760423f6f0200b057629aa9cd593a5c9c7
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Tue Oct 22 11:04:49 2019 -0400

    15652: Limit concurrent writes per filesystem, not per flush.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/sdk/go/arvados/fs_base.go b/sdk/go/arvados/fs_base.go
index 359e6b67e..d06aba369 100644
--- a/sdk/go/arvados/fs_base.go
+++ b/sdk/go/arvados/fs_base.go
@@ -58,6 +58,9 @@ type FileSystem interface {
 	// while locking multiple inodes.
 	locker() sync.Locker
 
+	// throttle for limiting concurrent background writers
+	throttle() *throttle
+
 	// create a new node with nil parent.
 	newNode(name string, perm os.FileMode, modTime time.Time) (node inode, err error)
 
@@ -300,12 +303,17 @@ type fileSystem struct {
 	root inode
 	fsBackend
 	mutex sync.Mutex
+	thr   *throttle
 }
 
 func (fs *fileSystem) rootnode() inode {
 	return fs.root
 }
 
+func (fs *fileSystem) throttle() *throttle {
+	return fs.thr
+}
+
 func (fs *fileSystem) locker() sync.Locker {
 	return &fs.mutex
 }
diff --git a/sdk/go/arvados/fs_collection.go b/sdk/go/arvados/fs_collection.go
index 0722da6b0..386408a10 100644
--- a/sdk/go/arvados/fs_collection.go
+++ b/sdk/go/arvados/fs_collection.go
@@ -21,8 +21,7 @@ import (
 
 var (
 	maxBlockSize      = 1 << 26
-	concurrentWriters = 4 // max goroutines writing to Keep during flush()
-	writeAheadBlocks  = 1 // max background jobs flushing to Keep before blocking writes
+	concurrentWriters = 4 // max goroutines writing to Keep in background and during flush()
 )
 
 // A CollectionFileSystem is a FileSystem that can be serialized as a
@@ -60,6 +59,7 @@ func (c *Collection) FileSystem(client apiClient, kc keepClient) (CollectionFile
 		uuid: c.UUID,
 		fileSystem: fileSystem{
 			fsBackend: keepBackend{apiClient: client, keepClient: kc},
+			thr:       newThrottle(concurrentWriters),
 		},
 	}
 	root := &dirnode{
@@ -170,7 +170,7 @@ func (fs *collectionFileSystem) Flush(path string, shortBlocks bool) error {
 		}
 		names = filenames
 	}
-	return dn.flush(context.TODO(), newThrottle(concurrentWriters), names, flushOpts{sync: false, shortBlocks: shortBlocks})
+	return dn.flush(context.TODO(), names, flushOpts{sync: false, shortBlocks: shortBlocks})
 }
 
 func (fs *collectionFileSystem) memorySize() int64 {
@@ -182,7 +182,7 @@ func (fs *collectionFileSystem) memorySize() int64 {
 func (fs *collectionFileSystem) MarshalManifest(prefix string) (string, error) {
 	fs.fileSystem.root.Lock()
 	defer fs.fileSystem.root.Unlock()
-	return fs.fileSystem.root.(*dirnode).marshalManifest(context.TODO(), prefix, newThrottle(concurrentWriters))
+	return fs.fileSystem.root.(*dirnode).marshalManifest(context.TODO(), prefix)
 }
 
 func (fs *collectionFileSystem) Size() int64 {
@@ -274,7 +274,6 @@ type filenode struct {
 	memsize  int64 // bytes in memSegments
 	sync.RWMutex
 	nullnode
-	throttle *throttle
 }
 
 // caller must have lock
@@ -539,10 +538,6 @@ func (fn *filenode) Write(p []byte, startPtr filenodePtr) (n int, ptr filenodePt
 func (fn *filenode) pruneMemSegments() {
 	// TODO: share code with (*dirnode)flush()
 	// TODO: pack/flush small blocks too, when fragmented
-	if fn.throttle == nil {
-		// TODO: share a throttle with filesystem
-		fn.throttle = newThrottle(writeAheadBlocks)
-	}
 	for idx, seg := range fn.segments {
 		seg, ok := seg.(*memSegment)
 		if !ok || seg.Len() < maxBlockSize || seg.flushing != nil {
@@ -558,11 +553,11 @@ func (fn *filenode) pruneMemSegments() {
 		// progress, block here until one finishes, rather
 		// than pile up an unlimited number of buffered writes
 		// and network flush operations.
-		fn.throttle.Acquire()
+		fn.fs.throttle().Acquire()
 		go func() {
 			defer close(done)
 			locator, _, err := fn.FS().PutB(buf)
-			fn.throttle.Release()
+			fn.fs.throttle().Release()
 			fn.Lock()
 			defer fn.Unlock()
 			if seg.flushing != done {
@@ -763,7 +758,7 @@ type flushOpts struct {
 // Caller must have write lock on dn and the named children.
 //
 // If any children are dirs, they will be flushed recursively.
-func (dn *dirnode) flush(ctx context.Context, throttle *throttle, names []string, opts flushOpts) error {
+func (dn *dirnode) flush(ctx context.Context, names []string, opts flushOpts) error {
 	cg := newContextGroup(ctx)
 	defer cg.Cancel()
 
@@ -772,8 +767,8 @@ func (dn *dirnode) flush(ctx context.Context, throttle *throttle, names []string
 			return
 		}
 		cg.Go(func() error {
-			throttle.Acquire()
-			defer throttle.Release()
+			dn.fs.throttle().Acquire()
+			defer dn.fs.throttle().Release()
 			return dn.commitBlock(cg.Context(), refs, opts.sync)
 		})
 	}
@@ -790,7 +785,7 @@ func (dn *dirnode) flush(ctx context.Context, throttle *throttle, names []string
 				grandchild.Lock()
 				defer grandchild.Unlock()
 			}
-			cg.Go(func() error { return node.flush(cg.Context(), throttle, grandchildNames, opts) })
+			cg.Go(func() error { return node.flush(cg.Context(), grandchildNames, opts) })
 		case *filenode:
 			for idx, seg := range node.segments {
 				switch seg := seg.(type) {
@@ -862,7 +857,7 @@ func (dn *dirnode) sortedNames() []string {
 }
 
 // caller must have write lock.
-func (dn *dirnode) marshalManifest(ctx context.Context, prefix string, throttle *throttle) (string, error) {
+func (dn *dirnode) marshalManifest(ctx context.Context, prefix string) (string, error) {
 	cg := newContextGroup(ctx)
 	defer cg.Cancel()
 
@@ -909,7 +904,7 @@ func (dn *dirnode) marshalManifest(ctx context.Context, prefix string, throttle
 	for i, name := range dirnames {
 		i, name := i, name
 		cg.Go(func() error {
-			txt, err := dn.inodes[name].(*dirnode).marshalManifest(cg.Context(), prefix+"/"+name, throttle)
+			txt, err := dn.inodes[name].(*dirnode).marshalManifest(cg.Context(), prefix+"/"+name)
 			subdirs[i] = txt
 			return err
 		})
@@ -925,7 +920,7 @@ func (dn *dirnode) marshalManifest(ctx context.Context, prefix string, throttle
 
 		var fileparts []filepart
 		var blocks []string
-		if err := dn.flush(cg.Context(), throttle, filenames, flushOpts{sync: true, shortBlocks: true}); err != nil {
+		if err := dn.flush(cg.Context(), filenames, flushOpts{sync: true, shortBlocks: true}); err != nil {
 			return err
 		}
 		for _, name := range filenames {
diff --git a/sdk/go/arvados/fs_collection_test.go b/sdk/go/arvados/fs_collection_test.go
index edc35ab9b..5f8d67510 100644
--- a/sdk/go/arvados/fs_collection_test.go
+++ b/sdk/go/arvados/fs_collection_test.go
@@ -1040,11 +1040,11 @@ func (s *CollectionFSSuite) TestOpenFileFlags(c *check.C) {
 }
 
 func (s *CollectionFSSuite) TestFlushFullBlocksWritingLongFile(c *check.C) {
-	defer func(wab, mbs int) {
-		writeAheadBlocks = wab
+	defer func(cw, mbs int) {
+		concurrentWriters = cw
 		maxBlockSize = mbs
-	}(writeAheadBlocks, maxBlockSize)
-	writeAheadBlocks = 2
+	}(concurrentWriters, maxBlockSize)
+	concurrentWriters = 2
 	maxBlockSize = 1024
 
 	proceed := make(chan struct{})
@@ -1069,7 +1069,7 @@ func (s *CollectionFSSuite) TestFlushFullBlocksWritingLongFile(c *check.C) {
 		default:
 			time.Sleep(time.Millisecond)
 		}
-		c.Check(atomic.AddInt32(&concurrent, -1) < int32(writeAheadBlocks), check.Equals, true)
+		c.Check(atomic.AddInt32(&concurrent, -1) < int32(concurrentWriters), check.Equals, true)
 	}
 
 	fs, err := (&Collection{}).FileSystem(s.client, s.kc)
diff --git a/sdk/go/arvados/fs_site.go b/sdk/go/arvados/fs_site.go
index 82114e2ea..4264be4fa 100644
--- a/sdk/go/arvados/fs_site.go
+++ b/sdk/go/arvados/fs_site.go
@@ -21,6 +21,7 @@ type CustomFileSystem interface {
 type customFileSystem struct {
 	fileSystem
 	root *vdirnode
+	thr  *throttle
 
 	staleThreshold time.Time
 	staleLock      sync.Mutex
@@ -33,6 +34,7 @@ func (c *Client) CustomFileSystem(kc keepClient) CustomFileSystem {
 		fileSystem: fileSystem{
 			fsBackend: keepBackend{apiClient: c, keepClient: kc},
 			root:      root,
+			thr:       newThrottle(concurrentWriters),
 		},
 	}
 	root.inode = &treenode{

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list