[ARVADOS] updated: 1.4.2-4-g89f1571a2

Git user git at public.curoverse.com
Thu Dec 5 15:46:19 UTC 2019


Summary of changes:
 sdk/go/arvados/fs_collection.go      | 13 +++++++++++--
 sdk/go/arvados/fs_collection_test.go | 24 ++++++++++++++++++++++++
 2 files changed, 35 insertions(+), 2 deletions(-)

       via  89f1571a26a5c90c79d241d049df73fb16b90954 (commit)
      from  b702e93a96fededd1cfb46e862f9209596063156 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 89f1571a26a5c90c79d241d049df73fb16b90954
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Wed Dec 4 11:28:38 2019 -0500

    15910: Fix races in collectionfs flush/sync.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/sdk/go/arvados/fs_collection.go b/sdk/go/arvados/fs_collection.go
index 578d73f5a..d206cb3b2 100644
--- a/sdk/go/arvados/fs_collection.go
+++ b/sdk/go/arvados/fs_collection.go
@@ -16,6 +16,7 @@ import (
 	"strconv"
 	"strings"
 	"sync"
+	"sync/atomic"
 	"time"
 )
 
@@ -688,6 +689,7 @@ func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, bufsize
 		}
 		segs = append(segs, seg)
 	}
+	blocksize := len(block)
 	dn.fs.throttle().Acquire()
 	errs := make(chan error, 1)
 	go func() {
@@ -698,6 +700,8 @@ func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, bufsize
 		dn.fs.throttle().Release()
 		{
 			if !sync {
+				dn.Lock()
+				defer dn.Unlock()
 				for _, name := range dn.sortedNames() {
 					if fn, ok := dn.inodes[name].(*filenode); ok {
 						fn.Lock()
@@ -746,11 +750,16 @@ func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, bufsize
 			ref.fn.segments[ref.idx] = storedSegment{
 				kc:      dn.fs,
 				locator: locator,
-				size:    len(block),
+				size:    blocksize,
 				offset:  offsets[idx],
 				length:  len(data),
 			}
-			ref.fn.memsize -= int64(len(data))
+			// atomic is needed here despite caller having
+			// lock: caller might be running concurrent
+			// commitBlock() goroutines using the same
+			// lock, writing different segments from the
+			// same file.
+			atomic.AddInt64(&ref.fn.memsize, -int64(len(data)))
 		}
 	}()
 	if sync {
diff --git a/sdk/go/arvados/fs_collection_test.go b/sdk/go/arvados/fs_collection_test.go
index e5cea0639..6ef7627db 100644
--- a/sdk/go/arvados/fs_collection_test.go
+++ b/sdk/go/arvados/fs_collection_test.go
@@ -1281,6 +1281,30 @@ func (s *CollectionFSSuite) TestMaxUnflushed(c *check.C) {
 	fs.Flush("", true)
 }
 
+func (s *CollectionFSSuite) TestFlushShort(c *check.C) {
+	s.kc.onPut = func([]byte) {
+		s.kc.Lock()
+		s.kc.blocks = map[string][]byte{}
+		s.kc.Unlock()
+	}
+	fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+	c.Assert(err, check.IsNil)
+	for _, blocksize := range []int{8, 1000000} {
+		dir := fmt.Sprintf("dir%d", blocksize)
+		err = fs.Mkdir(dir, 0755)
+		c.Assert(err, check.IsNil)
+		data := make([]byte, blocksize)
+		for i := 0; i < 100; i++ {
+			f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, i), os.O_WRONLY|os.O_CREATE, 0)
+			c.Assert(err, check.IsNil)
+			_, err = f.Write(data)
+			c.Assert(err, check.IsNil)
+			f.Close()
+			fs.Flush(dir, false)
+		}
+	}
+}
+
 func (s *CollectionFSSuite) TestBrokenManifests(c *check.C) {
 	for _, txt := range []string{
 		"\n",

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list