[ARVADOS] updated: 1.2.0-469-ga88f7ad97
Git user
git at public.curoverse.com
Thu Nov 29 15:52:56 EST 2018
Summary of changes:
sdk/go/arvados/fs_collection.go | 115 +++++++++++++++++++----------------
sdk/go/arvados/fs_collection_test.go | 47 ++++++++++++--
2 files changed, 105 insertions(+), 57 deletions(-)
via a88f7ad9728ee6968367928c6d3d7613bbf290ec (commit)
via a45b256a3261fff8f168321c02e61b94ae9b4a64 (commit)
via 50f8d8487ad5156058087438b670d7c6f8a8d718 (commit)
via f929cc124fc1fde8f1a6a12679e327b34aa88bde (commit)
via 478fb1838aea03ebad17b66926ef03aac536707b (commit)
from e36e81150649a6457c9cbf0101130cfdb776336f (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit a88f7ad9728ee6968367928c6d3d7613bbf290ec
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Thu Nov 29 15:41:50 2018 -0500
14538: Fix racy tests.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/sdk/go/arvados/fs_collection_test.go b/sdk/go/arvados/fs_collection_test.go
index 8567a830c..2ae2bd892 100644
--- a/sdk/go/arvados/fs_collection_test.go
+++ b/sdk/go/arvados/fs_collection_test.go
@@ -19,6 +19,7 @@ import (
"runtime"
"strings"
"sync"
+ "sync/atomic"
"testing"
"time"
@@ -31,6 +32,7 @@ var _ = check.Suite(&CollectionFSSuite{})
type keepClientStub struct {
blocks map[string][]byte
refreshable map[string]bool
+ onPut func(bufcopy []byte) // called from PutB, before acquiring lock
sync.RWMutex
}
@@ -50,6 +52,9 @@ func (kcs *keepClientStub) PutB(p []byte) (string, int, error) {
locator := fmt.Sprintf("%x+%d+A12345 at abcde", md5.Sum(p), len(p))
buf := make([]byte, len(p))
copy(buf, p)
+ if kcs.onPut != nil {
+ kcs.onPut(buf)
+ }
kcs.Lock()
defer kcs.Unlock()
kcs.blocks[locator[:32]] = buf
@@ -618,11 +623,18 @@ func (s *CollectionFSSuite) TestRandomWrites(c *check.C) {
c.Check(string(buf), check.Equals, string(expect))
c.Check(err, check.IsNil)
}
- s.checkMemSize(c, f)
}(n)
}
wg.Wait()
+ for n := 0; n < ngoroutines; n++ {
+ f, err := s.fs.OpenFile(fmt.Sprintf("random-%d", n), os.O_RDONLY, 0)
+ c.Assert(err, check.IsNil)
+ f.(*filehandle).inode.(*filenode).waitPrune()
+ s.checkMemSize(c, f)
+ defer f.Close()
+ }
+
root, err := s.fs.Open("/")
c.Assert(err, check.IsNil)
defer root.Close()
@@ -1029,8 +1041,37 @@ func (s *CollectionFSSuite) TestOpenFileFlags(c *check.C) {
}
func (s *CollectionFSSuite) TestFlushFullBlocks(c *check.C) {
+ defer func(wab, mbs int) {
+ writeAheadBlocks = wab
+ maxBlockSize = mbs
+ }(writeAheadBlocks, maxBlockSize)
+ writeAheadBlocks = 2
maxBlockSize = 1024
- defer func() { maxBlockSize = 2 << 26 }()
+
+ proceed := make(chan struct{})
+ var started, concurrent int32
+ blk2done := false
+ s.kc.onPut = func([]byte) {
+ atomic.AddInt32(&concurrent, 1)
+ switch atomic.AddInt32(&started, 1) {
+ case 1:
+ // Wait until block 2 starts and finishes, and block 3 starts
+ select {
+ case <-proceed:
+ c.Check(blk2done, check.Equals, true)
+ case <-time.After(time.Second):
+ c.Error("timed out")
+ }
+ case 2:
+ time.Sleep(time.Millisecond)
+ blk2done = true
+ case 3:
+ close(proceed)
+ default:
+ time.Sleep(time.Millisecond)
+ }
+ c.Check(atomic.AddInt32(&concurrent, -1) < int32(writeAheadBlocks), check.Equals, true)
+ }
fs, err := (&Collection{}).FileSystem(s.client, s.kc)
c.Assert(err, check.IsNil)
@@ -1056,8 +1097,6 @@ func (s *CollectionFSSuite) TestFlushFullBlocks(c *check.C) {
}
return
}
-
- c.Check(len(currentMemExtents()) <= writeAheadBlocks+1, check.Equals, true)
f.(*filehandle).inode.(*filenode).waitPrune()
c.Check(currentMemExtents(), check.HasLen, 1)
commit a45b256a3261fff8f168321c02e61b94ae9b4a64
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Thu Nov 29 15:15:42 2018 -0500
14538: Split inline func from sync() into commitBlock().
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/sdk/go/arvados/fs_collection.go b/sdk/go/arvados/fs_collection.go
index 6a9a00aa7..6644f4cfb 100644
--- a/sdk/go/arvados/fs_collection.go
+++ b/sdk/go/arvados/fs_collection.go
@@ -597,60 +597,67 @@ func (dn *dirnode) Child(name string, replace func(inode) (inode, error)) (inode
return dn.treenode.Child(name, replace)
}
+type fnSegmentRef struct {
+ fn *filenode
+ idx int
+}
+
+// commitBlock concatenates the data from the given filenode segments
+// (which must be *memSegments), writes the data out to Keep as a
+// single block, and replaces the filenodes' *memSegments with
+// storedSegments that reference the relevant portions of the new
+// block.
+//
+// Caller must have write lock.
+func (dn *dirnode) commitBlock(ctx context.Context, throttle *throttle, refs []fnSegmentRef) error {
+ if len(refs) == 0 {
+ return nil
+ }
+ throttle.Acquire()
+ defer throttle.Release()
+ if err := ctx.Err(); err != nil {
+ return err
+ }
+ block := make([]byte, 0, maxBlockSize)
+ for _, ref := range refs {
+ block = append(block, ref.fn.segments[ref.idx].(*memSegment).buf...)
+ }
+ locator, _, err := dn.fs.PutB(block)
+ if err != nil {
+ return err
+ }
+ off := 0
+ for _, ref := range refs {
+ data := ref.fn.segments[ref.idx].(*memSegment).buf
+ ref.fn.segments[ref.idx] = storedSegment{
+ kc: dn.fs,
+ locator: locator,
+ size: len(block),
+ offset: off,
+ length: len(data),
+ }
+ off += len(data)
+ ref.fn.memsize -= int64(len(data))
+ }
+ return nil
+}
+
// sync flushes in-memory data and remote block references (for the
// children with the given names, which must be children of dn) to
// local persistent storage. Caller must have write lock on dn and the
// named children.
-func (dn *dirnode) sync(ctx context.Context, names []string, throttle *throttle) error {
+func (dn *dirnode) sync(ctx context.Context, throttle *throttle, names []string) error {
cg := newContextGroup(ctx)
defer cg.Cancel()
- type shortBlock struct {
- fn *filenode
- idx int
- }
-
- flush := func(sbs []shortBlock) error {
- if len(sbs) == 0 {
- return nil
- }
- throttle.Acquire()
- defer throttle.Release()
- if err := cg.Context().Err(); err != nil {
- return err
- }
- block := make([]byte, 0, maxBlockSize)
- for _, sb := range sbs {
- block = append(block, sb.fn.segments[sb.idx].(*memSegment).buf...)
- }
- locator, _, err := dn.fs.PutB(block)
- if err != nil {
- return err
- }
- off := 0
- for _, sb := range sbs {
- data := sb.fn.segments[sb.idx].(*memSegment).buf
- sb.fn.segments[sb.idx] = storedSegment{
- kc: dn.fs,
- locator: locator,
- size: len(block),
- offset: off,
- length: len(data),
- }
- off += len(data)
- sb.fn.memsize -= int64(len(data))
- }
- return nil
- }
-
- goFlush := func(sbs []shortBlock) {
+ goCommit := func(refs []fnSegmentRef) {
cg.Go(func() error {
- return flush(sbs)
+ return dn.commitBlock(cg.Context(), throttle, refs)
})
}
- var pending []shortBlock
- var pendingLen int
+ var pending []fnSegmentRef
+ var pendingLen int = 0
localLocator := map[string]string{}
for _, name := range names {
fn, ok := dn.inodes[name].(*filenode)
@@ -673,22 +680,22 @@ func (dn *dirnode) sync(ctx context.Context, names []string, throttle *throttle)
fn.segments[idx] = seg
case *memSegment:
if seg.Len() > maxBlockSize/2 {
- goFlush([]shortBlock{{fn, idx}})
+ goCommit([]fnSegmentRef{{fn, idx}})
continue
}
if pendingLen+seg.Len() > maxBlockSize {
- goFlush(pending)
+ goCommit(pending)
pending = nil
pendingLen = 0
}
- pending = append(pending, shortBlock{fn, idx})
+ pending = append(pending, fnSegmentRef{fn, idx})
pendingLen += seg.Len()
default:
panic(fmt.Sprintf("can't sync segment type %T", seg))
}
}
}
- goFlush(pending)
+ goCommit(pending)
return cg.Wait()
}
@@ -760,7 +767,7 @@ func (dn *dirnode) marshalManifest(ctx context.Context, prefix string, throttle
var fileparts []filepart
var blocks []string
- if err := dn.sync(cg.Context(), names, throttle); err != nil {
+ if err := dn.sync(cg.Context(), throttle, names); err != nil {
return err
}
for _, name := range filenames {
commit 50f8d8487ad5156058087438b670d7c6f8a8d718
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Thu Nov 29 15:12:25 2018 -0500
14538: Reduce max writeahead to 1.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/sdk/go/arvados/fs_collection.go b/sdk/go/arvados/fs_collection.go
index bc4e4acfa..6a9a00aa7 100644
--- a/sdk/go/arvados/fs_collection.go
+++ b/sdk/go/arvados/fs_collection.go
@@ -19,9 +19,11 @@ import (
"time"
)
-var maxBlockSize = 1 << 26
-
-var concurrentWriters = 4
+var (
+ maxBlockSize = 1 << 26
+ concurrentWriters = 4 // max goroutines writing to Keep during sync()
+ writeAheadBlocks = 1 // max background jobs flushing to Keep before blocking writes
+)
// A CollectionFileSystem is a FileSystem that can be serialized as a
// manifest and stored as a collection.
@@ -498,7 +500,7 @@ func (fn *filenode) pruneMemSegments() {
// TODO: pack/flush small blocks too, when fragmented
if fn.throttle == nil {
// TODO: share a throttle with filesystem
- fn.throttle = newThrottle(concurrentWriters)
+ fn.throttle = newThrottle(writeAheadBlocks)
}
for idx, seg := range fn.segments {
seg, ok := seg.(*memSegment)
diff --git a/sdk/go/arvados/fs_collection_test.go b/sdk/go/arvados/fs_collection_test.go
index 73f7efda3..8567a830c 100644
--- a/sdk/go/arvados/fs_collection_test.go
+++ b/sdk/go/arvados/fs_collection_test.go
@@ -1057,7 +1057,7 @@ func (s *CollectionFSSuite) TestFlushFullBlocks(c *check.C) {
return
}
- c.Check(len(currentMemExtents()) <= concurrentWriters+1, check.Equals, true)
+ c.Check(len(currentMemExtents()) <= writeAheadBlocks+1, check.Equals, true)
f.(*filehandle).inode.(*filenode).waitPrune()
c.Check(currentMemExtents(), check.HasLen, 1)
commit f929cc124fc1fde8f1a6a12679e327b34aa88bde
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Thu Nov 29 14:34:25 2018 -0500
14538: Remove redundant condition.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/sdk/go/arvados/fs_collection.go b/sdk/go/arvados/fs_collection.go
index cd3dcf053..bc4e4acfa 100644
--- a/sdk/go/arvados/fs_collection.go
+++ b/sdk/go/arvados/fs_collection.go
@@ -502,7 +502,7 @@ func (fn *filenode) pruneMemSegments() {
}
for idx, seg := range fn.segments {
seg, ok := seg.(*memSegment)
- if !ok || seg.Len() < maxBlockSize || seg.Len() == 0 || seg.flushing != nil {
+ if !ok || seg.Len() < maxBlockSize || seg.flushing != nil {
continue
}
// Setting seg.flushing guarantees seg.buf will not be
commit 478fb1838aea03ebad17b66926ef03aac536707b
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Thu Nov 29 14:33:22 2018 -0500
14538: Fix test.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/sdk/go/arvados/fs_collection_test.go b/sdk/go/arvados/fs_collection_test.go
index 18cf47b1d..73f7efda3 100644
--- a/sdk/go/arvados/fs_collection_test.go
+++ b/sdk/go/arvados/fs_collection_test.go
@@ -1057,7 +1057,7 @@ func (s *CollectionFSSuite) TestFlushFullBlocks(c *check.C) {
return
}
- c.Check(len(currentMemExtents()) <= concurrentWriters, check.Equals, true)
+ c.Check(len(currentMemExtents()) <= concurrentWriters+1, check.Equals, true)
f.(*filehandle).inode.(*filenode).waitPrune()
c.Check(currentMemExtents(), check.HasLen, 1)
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list