[ARVADOS] created: 1.3.0-1714-g5843ada49
Git user
git at public.curoverse.com
Tue Oct 8 21:08:13 UTC 2019
at 5843ada490343587647f6de48ad3fd7927c4a0fc (commit)
commit 5843ada490343587647f6de48ad3fd7927c4a0fc
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Tue Oct 8 17:05:50 2019 -0400
15652: Add collectionfs Flush() method to control memory use.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/sdk/go/arvados/fs_base.go b/sdk/go/arvados/fs_base.go
index 3058a7609..f5916f957 100644
--- a/sdk/go/arvados/fs_base.go
+++ b/sdk/go/arvados/fs_base.go
@@ -86,7 +86,17 @@ type FileSystem interface {
Remove(name string) error
RemoveAll(name string) error
Rename(oldname, newname string) error
+
+ // Write buffered data from memory to storage, returning when
+ // all updates have been saved to persistent storage.
Sync() error
+
+ // Write buffered data from memory to storage, but don't wait
+ // for all writes to finish before returning. If shortBlocks
+ // is true, flush everything; otherwise, if there's less than
+ // a full block of buffered data at the end of a stream, leave
+ // it buffered in memory in case more data can be appended.
+ Flush(shortBlocks bool) error
}
type inode interface {
@@ -560,6 +570,11 @@ func (fs *fileSystem) Sync() error {
return ErrInvalidOperation
}
+func (fs *fileSystem) Flush(bool) error {
+ log.Printf("TODO: flush fileSystem")
+ return ErrInvalidOperation
+}
+
// rlookup (recursive lookup) returns the inode for the file/directory
// with the given name (which may contain "/" separators). If no such
// file/directory exists, the returned node is nil.
diff --git a/sdk/go/arvados/fs_collection.go b/sdk/go/arvados/fs_collection.go
index 972b3979f..c17dcf289 100644
--- a/sdk/go/arvados/fs_collection.go
+++ b/sdk/go/arvados/fs_collection.go
@@ -7,6 +7,7 @@ package arvados
import (
"context"
"encoding/json"
+ "errors"
"fmt"
"io"
"os"
@@ -21,7 +22,7 @@ import (
var (
maxBlockSize = 1 << 26
- concurrentWriters = 4 // max goroutines writing to Keep during sync()
+ concurrentWriters = 4 // max goroutines writing to Keep during flush()
writeAheadBlocks = 1 // max background jobs flushing to Keep before blocking writes
)
@@ -38,6 +39,9 @@ type CollectionFileSystem interface {
// Total data bytes in all files.
Size() int64
+
+ // Memory consumed by buffered file data.
+ memorySize() int64
}
type collectionFileSystem struct {
@@ -143,6 +147,19 @@ func (fs *collectionFileSystem) Sync() error {
return nil
}
+func (fs *collectionFileSystem) Flush(shortBlocks bool) error {
+ fs.fileSystem.root.Lock()
+ defer fs.fileSystem.root.Unlock()
+ dn := fs.fileSystem.root.(*dirnode)
+ return dn.flush(context.TODO(), newThrottle(concurrentWriters), dn.sortedNames(), flushOpts{sync: false, shortBlocks: shortBlocks})
+}
+
+func (fs *collectionFileSystem) memorySize() int64 {
+ fs.fileSystem.root.Lock()
+ defer fs.fileSystem.root.Unlock()
+ return fs.fileSystem.root.(*dirnode).memorySize()
+}
+
func (fs *collectionFileSystem) MarshalManifest(prefix string) (string, error) {
fs.fileSystem.root.Lock()
defer fs.fileSystem.root.Unlock()
@@ -501,7 +518,7 @@ func (fn *filenode) Write(p []byte, startPtr filenodePtr) (n int, ptr filenodePt
// Write some data out to disk to reduce memory use. Caller must have
// write lock.
func (fn *filenode) pruneMemSegments() {
- // TODO: share code with (*dirnode)sync()
+ // TODO: share code with (*dirnode)flush()
// TODO: pack/flush small blocks too, when fragmented
if fn.throttle == nil {
// TODO: share a throttle with filesystem
@@ -556,8 +573,8 @@ func (fn *filenode) pruneMemSegments() {
}
}
-// Block until all pending pruneMemSegments work is finished. Caller
-// must NOT have lock.
+// Block until all pending pruneMemSegments/flush work is
+// finished. Caller must NOT have lock.
func (fn *filenode) waitPrune() {
var pending []<-chan struct{}
fn.Lock()
@@ -614,50 +631,107 @@ type fnSegmentRef struct {
// block.
//
// Caller must have write lock.
-func (dn *dirnode) commitBlock(ctx context.Context, throttle *throttle, refs []fnSegmentRef) error {
- if len(refs) == 0 {
- return nil
- }
- throttle.Acquire()
- defer throttle.Release()
+func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, sync bool) error {
if err := ctx.Err(); err != nil {
return err
}
+ done := make(chan struct{})
block := make([]byte, 0, maxBlockSize)
for _, ref := range refs {
- block = append(block, ref.fn.segments[ref.idx].(*memSegment).buf...)
- }
- locator, _, err := dn.fs.PutB(block)
- if err != nil {
- return err
+ seg := ref.fn.segments[ref.idx].(*memSegment)
+ if seg.flushing != nil {
+ if sync {
+ return errors.New("BUG: commitBlock(sync=true) called without calling waitPrune() first")
+ } else {
+ return nil
+ }
+ }
+ seg.flushing = done
+ block = append(block, seg.buf...)
}
- off := 0
- for _, ref := range refs {
- data := ref.fn.segments[ref.idx].(*memSegment).buf
- ref.fn.segments[ref.idx] = storedSegment{
- kc: dn.fs,
- locator: locator,
- size: len(block),
- offset: off,
- length: len(data),
+ errs := make(chan error, 1)
+ go func() {
+ defer close(done)
+ defer close(errs)
+ locator, _, err := dn.fs.PutB(block)
+ if err != nil {
+ errs <- err
+ return
+ }
+ locked := map[*filenode]bool{}
+ if !sync {
+ for _, name := range dn.sortedNames() {
+ if fn, ok := dn.inodes[name].(*filenode); ok {
+ fn.Lock()
+ defer fn.Unlock()
+ locked[fn] = true
+ }
+ }
+ }
+ off := 0
+ for _, ref := range refs {
+ if !sync {
+ if len(ref.fn.segments) <= ref.idx {
+ // file segments have
+ // rearranged or changed in
+ // some way
+ continue
+ } else if seg, ok := ref.fn.segments[ref.idx].(*memSegment); !ok {
+ // segment has been replaced
+ continue
+ } else if seg.flushing != done {
+ // seg.buf has been replaced
+ continue
+ } else if !locked[ref.fn] {
+ // file was renamed, moved, or
+ // deleted since we called
+ // PutB
+ continue
+ }
+ }
+ data := ref.fn.segments[ref.idx].(*memSegment).buf
+ ref.fn.segments[ref.idx] = storedSegment{
+ kc: dn.fs,
+ locator: locator,
+ size: len(block),
+ offset: off,
+ length: len(data),
+ }
+ off += len(data)
+ ref.fn.memsize -= int64(len(data))
}
- off += len(data)
- ref.fn.memsize -= int64(len(data))
+ }()
+ if sync {
+ return <-errs
+ } else {
+ return nil
}
- return nil
}
-// sync flushes in-memory data and remote block references (for the
+type flushOpts struct {
+ sync bool
+ shortBlocks bool
+}
+
+// flush in-memory data and remote-cluster block references (for the
// children with the given names, which must be children of dn) to
-// local persistent storage. Caller must have write lock on dn and the
-// named children.
-func (dn *dirnode) sync(ctx context.Context, throttle *throttle, names []string) error {
+// local-cluster persistent storage.
+//
+// Caller must have write lock on dn and the named children.
+//
+// If any children are dirs, they will be flushed recursively.
+func (dn *dirnode) flush(ctx context.Context, throttle *throttle, names []string, opts flushOpts) error {
cg := newContextGroup(ctx)
defer cg.Cancel()
goCommit := func(refs []fnSegmentRef) {
+ if len(refs) == 0 {
+ return
+ }
cg.Go(func() error {
- return dn.commitBlock(cg.Context(), throttle, refs)
+ throttle.Acquire()
+ defer throttle.Release()
+ return dn.commitBlock(cg.Context(), refs, opts.sync)
})
}
@@ -665,46 +739,86 @@ func (dn *dirnode) sync(ctx context.Context, throttle *throttle, names []string)
var pendingLen int = 0
localLocator := map[string]string{}
for _, name := range names {
- fn, ok := dn.inodes[name].(*filenode)
- if !ok {
- continue
- }
- for idx, seg := range fn.segments {
- switch seg := seg.(type) {
- case storedSegment:
- loc, ok := localLocator[seg.locator]
- if !ok {
- var err error
- loc, err = dn.fs.LocalLocator(seg.locator)
- if err != nil {
- return err
+ switch node := dn.inodes[name].(type) {
+ case *dirnode:
+ names := node.sortedNames()
+ for _, name := range names {
+ child := node.inodes[name]
+ child.Lock()
+ defer child.Unlock()
+ }
+ cg.Go(func() error { return node.flush(cg.Context(), throttle, node.sortedNames(), opts) })
+ case *filenode:
+ for idx, seg := range node.segments {
+ switch seg := seg.(type) {
+ case storedSegment:
+ loc, ok := localLocator[seg.locator]
+ if !ok {
+ var err error
+ loc, err = dn.fs.LocalLocator(seg.locator)
+ if err != nil {
+ return err
+ }
+ localLocator[seg.locator] = loc
}
- localLocator[seg.locator] = loc
- }
- seg.locator = loc
- fn.segments[idx] = seg
- case *memSegment:
- if seg.Len() > maxBlockSize/2 {
- goCommit([]fnSegmentRef{{fn, idx}})
- continue
- }
- if pendingLen+seg.Len() > maxBlockSize {
- goCommit(pending)
- pending = nil
- pendingLen = 0
+ seg.locator = loc
+ node.segments[idx] = seg
+ case *memSegment:
+ if seg.Len() > maxBlockSize/2 {
+ goCommit([]fnSegmentRef{{node, idx}})
+ continue
+ }
+ if pendingLen+seg.Len() > maxBlockSize {
+ goCommit(pending)
+ pending = nil
+ pendingLen = 0
+ }
+ pending = append(pending, fnSegmentRef{node, idx})
+ pendingLen += seg.Len()
+ default:
+ panic(fmt.Sprintf("can't sync segment type %T", seg))
}
- pending = append(pending, fnSegmentRef{fn, idx})
- pendingLen += seg.Len()
- default:
- panic(fmt.Sprintf("can't sync segment type %T", seg))
}
}
}
- goCommit(pending)
+ if opts.shortBlocks {
+ goCommit(pending)
+ }
return cg.Wait()
}
// caller must have write lock.
+func (dn *dirnode) memorySize() (size int64) {
+ for _, name := range dn.sortedNames() {
+ node := dn.inodes[name]
+ node.Lock()
+ defer node.Unlock()
+ switch node := node.(type) {
+ case *dirnode:
+ size += node.memorySize()
+ case *filenode:
+ for _, seg := range node.segments {
+ switch seg := seg.(type) {
+ case *memSegment:
+ size += int64(seg.Len())
+ }
+ }
+ }
+ }
+ return
+}
+
+// caller must have write lock.
+func (dn *dirnode) sortedNames() []string {
+ names := make([]string, 0, len(dn.inodes))
+ for name := range dn.inodes {
+ names = append(names, name)
+ }
+ sort.Strings(names)
+ return names
+}
+
+// caller must have write lock.
func (dn *dirnode) marshalManifest(ctx context.Context, prefix string, throttle *throttle) (string, error) {
cg := newContextGroup(ctx)
defer cg.Cancel()
@@ -720,11 +834,7 @@ func (dn *dirnode) marshalManifest(ctx context.Context, prefix string, throttle
return manifestEscape(prefix) + " d41d8cd98f00b204e9800998ecf8427e+0 0:0:\\056\n", nil
}
- names := make([]string, 0, len(dn.inodes))
- for name := range dn.inodes {
- names = append(names, name)
- }
- sort.Strings(names)
+ names := dn.sortedNames()
// Wait for children to finish any pending write operations
// before locking them.
@@ -772,7 +882,7 @@ func (dn *dirnode) marshalManifest(ctx context.Context, prefix string, throttle
var fileparts []filepart
var blocks []string
- if err := dn.sync(cg.Context(), throttle, names); err != nil {
+ if err := dn.flush(cg.Context(), throttle, filenames, flushOpts{sync: true, shortBlocks: true}); err != nil {
return err
}
for _, name := range filenames {
@@ -805,7 +915,7 @@ func (dn *dirnode) marshalManifest(ctx context.Context, prefix string, throttle
default:
// This can't happen: we
// haven't unlocked since
- // calling sync().
+ // calling flush(sync=true).
panic(fmt.Sprintf("can't marshal segment type %T", seg))
}
}
diff --git a/sdk/go/arvados/fs_collection_test.go b/sdk/go/arvados/fs_collection_test.go
index 7fd03b120..8f977096d 100644
--- a/sdk/go/arvados/fs_collection_test.go
+++ b/sdk/go/arvados/fs_collection_test.go
@@ -1039,7 +1039,7 @@ func (s *CollectionFSSuite) TestOpenFileFlags(c *check.C) {
c.Check(err, check.ErrorMatches, `invalid flag.*`)
}
-func (s *CollectionFSSuite) TestFlushFullBlocks(c *check.C) {
+func (s *CollectionFSSuite) TestFlushFullBlocksWritingLongFile(c *check.C) {
defer func(wab, mbs int) {
writeAheadBlocks = wab
maxBlockSize = mbs
@@ -1105,6 +1105,95 @@ func (s *CollectionFSSuite) TestFlushFullBlocks(c *check.C) {
c.Check(currentMemExtents(), check.HasLen, 0)
}
+// Ensure blocks get flushed to disk if a lot of data is written to
+// small files/directories without calling sync().
+//
+// Write four 512KiB files into each of 256 top-level dirs (total
+// 512MiB), calling Flush() every 8 dirs. Ensure memory usage never
+// exceeds 24MiB (4 concurrentWriters * 2MiB + 8 unflushed dirs *
+// 2MiB).
+func (s *CollectionFSSuite) TestFlushAll(c *check.C) {
+ s.kc.onPut = func([]byte) {
+ // discard flushed data -- otherwise the stub will use
+ // unlimited memory
+ time.Sleep(time.Millisecond)
+ s.kc.Lock()
+ defer s.kc.Unlock()
+ s.kc.blocks = map[string][]byte{}
+ }
+ fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+ c.Assert(err, check.IsNil)
+ for i := 0; i < 256; i++ {
+ buf := bytes.NewBuffer(make([]byte, 524288))
+ fmt.Fprintf(buf, "test file in dir%d", i)
+
+ dir := fmt.Sprintf("dir%d", i)
+ fs.Mkdir(dir, 0755)
+ for j := 0; j < 2; j++ {
+ f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
+ c.Assert(err, check.IsNil)
+ defer f.Close()
+ _, err = io.Copy(f, buf)
+ c.Assert(err, check.IsNil)
+ }
+
+ if i%8 == 0 {
+ fs.Flush(true)
+ }
+
+ size := fs.memorySize()
+ if !c.Check(size <= 1<<24, check.Equals, true) {
+ c.Logf("at dir%d fs.memorySize()=%d", i, size)
+ return
+ }
+ }
+}
+
+// Ensure short blocks at the end of a stream don't get flushed by
+// Flush(false).
+//
+// Write 67x 1MiB files to each of 8 dirs, and check that 8 full 64MiB
+// blocks have been flushed while 8x 3MiB is still buffered in memory.
+func (s *CollectionFSSuite) TestFlushFullBlocksOnly(c *check.C) {
+ nDirs := 8
+ done := make(chan bool, nDirs)
+ var flushed int64
+ s.kc.onPut = func(p []byte) {
+ flushed += int64(len(p))
+ done <- true
+ }
+ fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+ c.Assert(err, check.IsNil)
+ megabyte := make([]byte, 1<<20)
+ for i := 0; i < nDirs; i++ {
+ dir := fmt.Sprintf("dir%d", i)
+ fs.Mkdir(dir, 0755)
+ for j := 0; j < 67; j++ {
+ f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
+ c.Assert(err, check.IsNil)
+ defer f.Close()
+ _, err = f.Write(megabyte)
+ c.Assert(err, check.IsNil)
+ }
+ }
+ c.Check(fs.memorySize(), check.Equals, int64(nDirs*67<<20))
+ c.Check(flushed, check.Equals, int64(0))
+
+ fs.Flush(false)
+ expectSize := int64(nDirs * 3 << 20)
+ // Wait for flush to finish
+ deadline := time.Now().Add(time.Second)
+ for range time.NewTicker(time.Millisecond).C {
+ if fs.memorySize() <= expectSize {
+ break
+ } else if time.Now().After(deadline) {
+ c.Fatal("timeout")
+ }
+ }
+ c.Check(fs.memorySize(), check.Equals, expectSize)
+ c.Check(flushed, check.Equals, int64(nDirs*64<<20))
+}
+
func (s *CollectionFSSuite) TestBrokenManifests(c *check.C) {
for _, txt := range []string{
"\n",
diff --git a/services/crunch-run/copier.go b/services/crunch-run/copier.go
index 3f529f631..555a2654d 100644
--- a/services/crunch-run/copier.go
+++ b/services/crunch-run/copier.go
@@ -82,7 +82,18 @@ func (cp *copier) Copy() (string, error) {
return "", fmt.Errorf("error making directory %q in output collection: %v", d, err)
}
}
+ var lastparentdir string
for _, f := range cp.files {
+ // If a dir has just had its last file added, do a
+ // full Flush. Otherwise, do a partial Flush (write
+ // full-size blocks, but leave the last short block
+ // open so f's data can be packed with it).
+ dir, _ := filepath.Split(f.dst)
+ if err := fs.Flush(dir != lastparentdir); err != nil {
+ return "", fmt.Errorf("error flushing output collection file data: %v", err)
+ }
+ lastparentdir = dir
+
err = cp.copyFile(fs, f)
if err != nil {
return "", fmt.Errorf("error copying file %q into output collection: %v", f, err)
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list