[ARVADOS] updated: 1.2.0-469-ga88f7ad97

Git user git at public.curoverse.com
Thu Nov 29 15:52:56 EST 2018


Summary of changes:
 sdk/go/arvados/fs_collection.go      | 115 +++++++++++++++++++----------------
 sdk/go/arvados/fs_collection_test.go |  47 ++++++++++++--
 2 files changed, 105 insertions(+), 57 deletions(-)

       via  a88f7ad9728ee6968367928c6d3d7613bbf290ec (commit)
       via  a45b256a3261fff8f168321c02e61b94ae9b4a64 (commit)
       via  50f8d8487ad5156058087438b670d7c6f8a8d718 (commit)
       via  f929cc124fc1fde8f1a6a12679e327b34aa88bde (commit)
       via  478fb1838aea03ebad17b66926ef03aac536707b (commit)
      from  e36e81150649a6457c9cbf0101130cfdb776336f (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit a88f7ad9728ee6968367928c6d3d7613bbf290ec
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Thu Nov 29 15:41:50 2018 -0500

    14538: Fix racy tests.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/sdk/go/arvados/fs_collection_test.go b/sdk/go/arvados/fs_collection_test.go
index 8567a830c..2ae2bd892 100644
--- a/sdk/go/arvados/fs_collection_test.go
+++ b/sdk/go/arvados/fs_collection_test.go
@@ -19,6 +19,7 @@ import (
 	"runtime"
 	"strings"
 	"sync"
+	"sync/atomic"
 	"testing"
 	"time"
 
@@ -31,6 +32,7 @@ var _ = check.Suite(&CollectionFSSuite{})
 type keepClientStub struct {
 	blocks      map[string][]byte
 	refreshable map[string]bool
+	onPut       func(bufcopy []byte) // called from PutB, before acquiring lock
 	sync.RWMutex
 }
 
@@ -50,6 +52,9 @@ func (kcs *keepClientStub) PutB(p []byte) (string, int, error) {
 	locator := fmt.Sprintf("%x+%d+A12345 at abcde", md5.Sum(p), len(p))
 	buf := make([]byte, len(p))
 	copy(buf, p)
+	if kcs.onPut != nil {
+		kcs.onPut(buf)
+	}
 	kcs.Lock()
 	defer kcs.Unlock()
 	kcs.blocks[locator[:32]] = buf
@@ -618,11 +623,18 @@ func (s *CollectionFSSuite) TestRandomWrites(c *check.C) {
 				c.Check(string(buf), check.Equals, string(expect))
 				c.Check(err, check.IsNil)
 			}
-			s.checkMemSize(c, f)
 		}(n)
 	}
 	wg.Wait()
 
+	for n := 0; n < ngoroutines; n++ {
+		f, err := s.fs.OpenFile(fmt.Sprintf("random-%d", n), os.O_RDONLY, 0)
+		c.Assert(err, check.IsNil)
+		f.(*filehandle).inode.(*filenode).waitPrune()
+		s.checkMemSize(c, f)
+		defer f.Close()
+	}
+
 	root, err := s.fs.Open("/")
 	c.Assert(err, check.IsNil)
 	defer root.Close()
@@ -1029,8 +1041,37 @@ func (s *CollectionFSSuite) TestOpenFileFlags(c *check.C) {
 }
 
 func (s *CollectionFSSuite) TestFlushFullBlocks(c *check.C) {
+	defer func(wab, mbs int) {
+		writeAheadBlocks = wab
+		maxBlockSize = mbs
+	}(writeAheadBlocks, maxBlockSize)
+	writeAheadBlocks = 2
 	maxBlockSize = 1024
-	defer func() { maxBlockSize = 2 << 26 }()
+
+	proceed := make(chan struct{})
+	var started, concurrent int32
+	blk2done := false
+	s.kc.onPut = func([]byte) {
+		atomic.AddInt32(&concurrent, 1)
+		switch atomic.AddInt32(&started, 1) {
+		case 1:
+			// Wait until block 2 starts and finishes, and block 3 starts
+			select {
+			case <-proceed:
+				c.Check(blk2done, check.Equals, true)
+			case <-time.After(time.Second):
+				c.Error("timed out")
+			}
+		case 2:
+			time.Sleep(time.Millisecond)
+			blk2done = true
+		case 3:
+			close(proceed)
+		default:
+			time.Sleep(time.Millisecond)
+		}
+		c.Check(atomic.AddInt32(&concurrent, -1) < int32(writeAheadBlocks), check.Equals, true)
+	}
 
 	fs, err := (&Collection{}).FileSystem(s.client, s.kc)
 	c.Assert(err, check.IsNil)
@@ -1056,8 +1097,6 @@ func (s *CollectionFSSuite) TestFlushFullBlocks(c *check.C) {
 		}
 		return
 	}
-
-	c.Check(len(currentMemExtents()) <= writeAheadBlocks+1, check.Equals, true)
 	f.(*filehandle).inode.(*filenode).waitPrune()
 	c.Check(currentMemExtents(), check.HasLen, 1)
 

commit a45b256a3261fff8f168321c02e61b94ae9b4a64
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Thu Nov 29 15:15:42 2018 -0500

    14538: Split inline func from sync() into commitBlock().
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/sdk/go/arvados/fs_collection.go b/sdk/go/arvados/fs_collection.go
index 6a9a00aa7..6644f4cfb 100644
--- a/sdk/go/arvados/fs_collection.go
+++ b/sdk/go/arvados/fs_collection.go
@@ -597,60 +597,67 @@ func (dn *dirnode) Child(name string, replace func(inode) (inode, error)) (inode
 	return dn.treenode.Child(name, replace)
 }
 
+type fnSegmentRef struct {
+	fn  *filenode
+	idx int
+}
+
+// commitBlock concatenates the data from the given filenode segments
+// (which must be *memSegments), writes the data out to Keep as a
+// single block, and replaces the filenodes' *memSegments with
+// storedSegments that reference the relevant portions of the new
+// block.
+//
+// Caller must have write lock.
+func (dn *dirnode) commitBlock(ctx context.Context, throttle *throttle, refs []fnSegmentRef) error {
+	if len(refs) == 0 {
+		return nil
+	}
+	throttle.Acquire()
+	defer throttle.Release()
+	if err := ctx.Err(); err != nil {
+		return err
+	}
+	block := make([]byte, 0, maxBlockSize)
+	for _, ref := range refs {
+		block = append(block, ref.fn.segments[ref.idx].(*memSegment).buf...)
+	}
+	locator, _, err := dn.fs.PutB(block)
+	if err != nil {
+		return err
+	}
+	off := 0
+	for _, ref := range refs {
+		data := ref.fn.segments[ref.idx].(*memSegment).buf
+		ref.fn.segments[ref.idx] = storedSegment{
+			kc:      dn.fs,
+			locator: locator,
+			size:    len(block),
+			offset:  off,
+			length:  len(data),
+		}
+		off += len(data)
+		ref.fn.memsize -= int64(len(data))
+	}
+	return nil
+}
+
 // sync flushes in-memory data and remote block references (for the
 // children with the given names, which must be children of dn) to
 // local persistent storage. Caller must have write lock on dn and the
 // named children.
-func (dn *dirnode) sync(ctx context.Context, names []string, throttle *throttle) error {
+func (dn *dirnode) sync(ctx context.Context, throttle *throttle, names []string) error {
 	cg := newContextGroup(ctx)
 	defer cg.Cancel()
 
-	type shortBlock struct {
-		fn  *filenode
-		idx int
-	}
-
-	flush := func(sbs []shortBlock) error {
-		if len(sbs) == 0 {
-			return nil
-		}
-		throttle.Acquire()
-		defer throttle.Release()
-		if err := cg.Context().Err(); err != nil {
-			return err
-		}
-		block := make([]byte, 0, maxBlockSize)
-		for _, sb := range sbs {
-			block = append(block, sb.fn.segments[sb.idx].(*memSegment).buf...)
-		}
-		locator, _, err := dn.fs.PutB(block)
-		if err != nil {
-			return err
-		}
-		off := 0
-		for _, sb := range sbs {
-			data := sb.fn.segments[sb.idx].(*memSegment).buf
-			sb.fn.segments[sb.idx] = storedSegment{
-				kc:      dn.fs,
-				locator: locator,
-				size:    len(block),
-				offset:  off,
-				length:  len(data),
-			}
-			off += len(data)
-			sb.fn.memsize -= int64(len(data))
-		}
-		return nil
-	}
-
-	goFlush := func(sbs []shortBlock) {
+	goCommit := func(refs []fnSegmentRef) {
 		cg.Go(func() error {
-			return flush(sbs)
+			return dn.commitBlock(cg.Context(), throttle, refs)
 		})
 	}
 
-	var pending []shortBlock
-	var pendingLen int
+	var pending []fnSegmentRef
+	var pendingLen int = 0
 	localLocator := map[string]string{}
 	for _, name := range names {
 		fn, ok := dn.inodes[name].(*filenode)
@@ -673,22 +680,22 @@ func (dn *dirnode) sync(ctx context.Context, names []string, throttle *throttle)
 				fn.segments[idx] = seg
 			case *memSegment:
 				if seg.Len() > maxBlockSize/2 {
-					goFlush([]shortBlock{{fn, idx}})
+					goCommit([]fnSegmentRef{{fn, idx}})
 					continue
 				}
 				if pendingLen+seg.Len() > maxBlockSize {
-					goFlush(pending)
+					goCommit(pending)
 					pending = nil
 					pendingLen = 0
 				}
-				pending = append(pending, shortBlock{fn, idx})
+				pending = append(pending, fnSegmentRef{fn, idx})
 				pendingLen += seg.Len()
 			default:
 				panic(fmt.Sprintf("can't sync segment type %T", seg))
 			}
 		}
 	}
-	goFlush(pending)
+	goCommit(pending)
 	return cg.Wait()
 }
 
@@ -760,7 +767,7 @@ func (dn *dirnode) marshalManifest(ctx context.Context, prefix string, throttle
 
 		var fileparts []filepart
 		var blocks []string
-		if err := dn.sync(cg.Context(), names, throttle); err != nil {
+		if err := dn.sync(cg.Context(), throttle, names); err != nil {
 			return err
 		}
 		for _, name := range filenames {

commit 50f8d8487ad5156058087438b670d7c6f8a8d718
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Thu Nov 29 15:12:25 2018 -0500

    14538: Reduce max writeahead to 1.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/sdk/go/arvados/fs_collection.go b/sdk/go/arvados/fs_collection.go
index bc4e4acfa..6a9a00aa7 100644
--- a/sdk/go/arvados/fs_collection.go
+++ b/sdk/go/arvados/fs_collection.go
@@ -19,9 +19,11 @@ import (
 	"time"
 )
 
-var maxBlockSize = 1 << 26
-
-var concurrentWriters = 4
+var (
+	maxBlockSize      = 1 << 26
+	concurrentWriters = 4 // max goroutines writing to Keep during sync()
+	writeAheadBlocks  = 1 // max background jobs flushing to Keep before blocking writes
+)
 
 // A CollectionFileSystem is a FileSystem that can be serialized as a
 // manifest and stored as a collection.
@@ -498,7 +500,7 @@ func (fn *filenode) pruneMemSegments() {
 	// TODO: pack/flush small blocks too, when fragmented
 	if fn.throttle == nil {
 		// TODO: share a throttle with filesystem
-		fn.throttle = newThrottle(concurrentWriters)
+		fn.throttle = newThrottle(writeAheadBlocks)
 	}
 	for idx, seg := range fn.segments {
 		seg, ok := seg.(*memSegment)
diff --git a/sdk/go/arvados/fs_collection_test.go b/sdk/go/arvados/fs_collection_test.go
index 73f7efda3..8567a830c 100644
--- a/sdk/go/arvados/fs_collection_test.go
+++ b/sdk/go/arvados/fs_collection_test.go
@@ -1057,7 +1057,7 @@ func (s *CollectionFSSuite) TestFlushFullBlocks(c *check.C) {
 		return
 	}
 
-	c.Check(len(currentMemExtents()) <= concurrentWriters+1, check.Equals, true)
+	c.Check(len(currentMemExtents()) <= writeAheadBlocks+1, check.Equals, true)
 	f.(*filehandle).inode.(*filenode).waitPrune()
 	c.Check(currentMemExtents(), check.HasLen, 1)
 

commit f929cc124fc1fde8f1a6a12679e327b34aa88bde
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Thu Nov 29 14:34:25 2018 -0500

    14538: Remove redundant condition.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/sdk/go/arvados/fs_collection.go b/sdk/go/arvados/fs_collection.go
index cd3dcf053..bc4e4acfa 100644
--- a/sdk/go/arvados/fs_collection.go
+++ b/sdk/go/arvados/fs_collection.go
@@ -502,7 +502,7 @@ func (fn *filenode) pruneMemSegments() {
 	}
 	for idx, seg := range fn.segments {
 		seg, ok := seg.(*memSegment)
-		if !ok || seg.Len() < maxBlockSize || seg.Len() == 0 || seg.flushing != nil {
+		if !ok || seg.Len() < maxBlockSize || seg.flushing != nil {
 			continue
 		}
 		// Setting seg.flushing guarantees seg.buf will not be

commit 478fb1838aea03ebad17b66926ef03aac536707b
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Thu Nov 29 14:33:22 2018 -0500

    14538: Fix test.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/sdk/go/arvados/fs_collection_test.go b/sdk/go/arvados/fs_collection_test.go
index 18cf47b1d..73f7efda3 100644
--- a/sdk/go/arvados/fs_collection_test.go
+++ b/sdk/go/arvados/fs_collection_test.go
@@ -1057,7 +1057,7 @@ func (s *CollectionFSSuite) TestFlushFullBlocks(c *check.C) {
 		return
 	}
 
-	c.Check(len(currentMemExtents()) <= concurrentWriters, check.Equals, true)
+	c.Check(len(currentMemExtents()) <= concurrentWriters+1, check.Equals, true)
 	f.(*filehandle).inode.(*filenode).waitPrune()
 	c.Check(currentMemExtents(), check.HasLen, 1)
 

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list