[ARVADOS] created: 1.3.0-1998-g4ff934bda

Git user git at public.arvados.org
Sat Dec 21 21:25:06 UTC 2019


        at  4ff934bda68d59295e5ba99a198380248da6b8f3 (commit)


commit 4ff934bda68d59295e5ba99a198380248da6b8f3
Author: Tom Clegg <tom at tomclegg.ca>
Date:   Sat Dec 21 16:22:29 2019 -0500

    15942: Fix deadlock caused by unclosed "done" channel.
    
    Also, stop relying on the flushing goroutine to set flushing=nil when
    finished; closing the channel is now enough to indicate work is done.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at tomclegg.ca>

diff --git a/sdk/go/arvados/fs_collection.go b/sdk/go/arvados/fs_collection.go
index d0e97f2ad..37bd49491 100644
--- a/sdk/go/arvados/fs_collection.go
+++ b/sdk/go/arvados/fs_collection.go
@@ -568,7 +568,6 @@ func (fn *filenode) pruneMemSegments() {
 				// A new seg.buf has been allocated.
 				return
 			}
-			seg.flushing = nil
 			if err != nil {
 				// TODO: stall (or return errors from)
 				// subsequent writes until flushing
@@ -671,9 +670,10 @@ func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, bufsize
 	offsets := make([]int, 0, len(refs)) // location of segment's data within block
 	for _, ref := range refs {
 		seg := ref.fn.segments[ref.idx].(*memSegment)
-		if seg.flushing != nil && !sync {
+		if !sync && seg.flushingUnfinished() {
 			// Let the other flushing goroutine finish. If
 			// it fails, we'll try again next time.
+			close(done)
 			return nil
 		} else {
 			// In sync mode, we proceed regardless of
@@ -700,15 +700,6 @@ func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, bufsize
 		defer close(errs)
 		locator, _, err := dn.fs.PutB(block)
 		dn.fs.throttle().Release()
-		{
-			defer func() {
-				for _, seg := range segs {
-					if seg.flushing == done {
-						seg.flushing = nil
-					}
-				}
-			}()
-		}
 		if err != nil {
 			errs <- err
 			return
@@ -1198,13 +1189,26 @@ type segment interface {
 
 type memSegment struct {
 	buf []byte
-	// If flushing is not nil, then a) buf is being shared by a
-	// pruneMemSegments goroutine, and must be copied on write;
-	// and b) the flushing channel will close when the goroutine
-	// finishes, whether it succeeds or not.
+	// If flushing is not nil and not ready/closed, then a) buf is
+	// being shared by a pruneMemSegments goroutine, and must be
+	// copied on write; and b) the flushing channel will close
+	// when the goroutine finishes, whether it succeeds or not.
 	flushing <-chan struct{}
 }
 
+func (me *memSegment) flushingUnfinished() bool {
+	if me.flushing == nil {
+		return false
+	}
+	select {
+	case <-me.flushing:
+		me.flushing = nil
+		return false
+	default:
+		return true
+	}
+}
+
 func (me *memSegment) Len() int {
 	return len(me.buf)
 }
diff --git a/sdk/go/arvados/fs_collection_test.go b/sdk/go/arvados/fs_collection_test.go
index 49fdc397d..a32a4a304 100644
--- a/sdk/go/arvados/fs_collection_test.go
+++ b/sdk/go/arvados/fs_collection_test.go
@@ -17,6 +17,7 @@ import (
 	"os"
 	"regexp"
 	"runtime"
+	"runtime/pprof"
 	"strings"
 	"sync"
 	"sync/atomic"
@@ -1274,12 +1275,55 @@ func (s *CollectionFSSuite) TestMaxUnflushed(c *check.C) {
 					fs.Flush(dir, false)
 				}(j)
 			}
+			_, err := fs.MarshalManifest(".")
+			c.Check(err, check.IsNil)
 		}()
 	}
 	owg.Wait()
 	fs.Flush("", true)
 }
 
+func (s *CollectionFSSuite) TestFlushStress(c *check.C) {
+	done := false
+	defer func() { done = true }()
+	time.AfterFunc(10*time.Second, func() {
+		if !done {
+			pprof.Lookup("goroutine").WriteTo(os.Stderr, 1)
+			panic("timeout")
+		}
+	})
+
+	wrote := 0
+	s.kc.onPut = func(p []byte) {
+		s.kc.Lock()
+		s.kc.blocks = map[string][]byte{}
+		wrote++
+		defer c.Logf("wrote block %d, %d bytes", wrote, len(p))
+		s.kc.Unlock()
+		time.Sleep(20 * time.Millisecond)
+	}
+
+	fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+	c.Assert(err, check.IsNil)
+
+	data := make([]byte, 1<<20)
+	for i := 0; i < 3; i++ {
+		dir := fmt.Sprintf("dir%d", i)
+		fs.Mkdir(dir, 0755)
+		for j := 0; j < 200; j++ {
+			data[0] = byte(j)
+			f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
+			c.Assert(err, check.IsNil)
+			_, err = f.Write(data)
+			c.Assert(err, check.IsNil)
+			f.Close()
+			fs.Flush(dir, false)
+		}
+		_, err := fs.MarshalManifest(".")
+		c.Check(err, check.IsNil)
+	}
+}
+
 func (s *CollectionFSSuite) TestFlushShort(c *check.C) {
 	s.kc.onPut = func([]byte) {
 		s.kc.Lock()
@@ -1301,6 +1345,9 @@ func (s *CollectionFSSuite) TestFlushShort(c *check.C) {
 			f.Close()
 			fs.Flush(dir, false)
 		}
+		fs.Flush(dir, true)
+		_, err := fs.MarshalManifest(".")
+		c.Check(err, check.IsNil)
 	}
 }
 

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list