[ARVADOS] created: 1.3.0-1958-g5f9e11ae9
Git user
git at public.curoverse.com
Wed Dec 4 16:30:26 UTC 2019
at 5f9e11ae95dc2b768ccc7e5f165be06b6baabdb2 (commit)
commit 5f9e11ae95dc2b768ccc7e5f165be06b6baabdb2
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Wed Dec 4 11:28:38 2019 -0500
15910: Fix races in collectionfs flush/sync.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/sdk/go/arvados/fs_collection.go b/sdk/go/arvados/fs_collection.go
index 40c890802..3d0928b84 100644
--- a/sdk/go/arvados/fs_collection.go
+++ b/sdk/go/arvados/fs_collection.go
@@ -16,6 +16,7 @@ import (
"strconv"
"strings"
"sync"
+ "sync/atomic"
"time"
)
@@ -691,6 +692,7 @@ func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, bufsize
}
segs = append(segs, seg)
}
+ blocksize := len(block)
dn.fs.throttle().Acquire()
errs := make(chan error, 1)
go func() {
@@ -701,6 +703,8 @@ func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, bufsize
dn.fs.throttle().Release()
{
if !sync {
+ dn.Lock()
+ defer dn.Unlock()
for _, name := range dn.sortedNames() {
if fn, ok := dn.inodes[name].(*filenode); ok {
fn.Lock()
@@ -749,11 +753,16 @@ func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, bufsize
ref.fn.segments[ref.idx] = storedSegment{
kc: dn.fs,
locator: locator,
- size: len(block),
+ size: blocksize,
offset: offsets[idx],
length: len(data),
}
- ref.fn.memsize -= int64(len(data))
+ // atomic is needed here despite caller having
+ // lock: caller might be running concurrent
+ // commitBlock() goroutines using the same
+ // lock, writing different segments from the
+ // same file.
+ atomic.AddInt64(&ref.fn.memsize, -int64(len(data)))
}
}()
if sync {
diff --git a/sdk/go/arvados/fs_collection_test.go b/sdk/go/arvados/fs_collection_test.go
index 352b226bf..49fdc397d 100644
--- a/sdk/go/arvados/fs_collection_test.go
+++ b/sdk/go/arvados/fs_collection_test.go
@@ -1280,6 +1280,30 @@ func (s *CollectionFSSuite) TestMaxUnflushed(c *check.C) {
fs.Flush("", true)
}
+func (s *CollectionFSSuite) TestFlushShort(c *check.C) {
+ s.kc.onPut = func([]byte) {
+ s.kc.Lock()
+ s.kc.blocks = map[string][]byte{}
+ s.kc.Unlock()
+ }
+ fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+ c.Assert(err, check.IsNil)
+ for _, blocksize := range []int{8, 1000000} {
+ dir := fmt.Sprintf("dir%d", blocksize)
+ err = fs.Mkdir(dir, 0755)
+ c.Assert(err, check.IsNil)
+ data := make([]byte, blocksize)
+ for i := 0; i < 100; i++ {
+ f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, i), os.O_WRONLY|os.O_CREATE, 0)
+ c.Assert(err, check.IsNil)
+ _, err = f.Write(data)
+ c.Assert(err, check.IsNil)
+ f.Close()
+ fs.Flush(dir, false)
+ }
+ }
+}
+
func (s *CollectionFSSuite) TestBrokenManifests(c *check.C) {
for _, txt := range []string{
"\n",
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list