[ARVADOS] created: 1.3.0-1714-gb714ab740

Git user git at public.curoverse.com
Tue Oct 15 15:55:34 UTC 2019


        at  b714ab7401074991afe2fdc239c89107b3af6ca1 (commit)


commit b714ab7401074991afe2fdc239c89107b3af6ca1
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Tue Oct 15 11:51:25 2019 -0400

    15652: Add collectionfs Flush() method to control memory use.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/sdk/go/arvados/fs_base.go b/sdk/go/arvados/fs_base.go
index 3058a7609..f5916f957 100644
--- a/sdk/go/arvados/fs_base.go
+++ b/sdk/go/arvados/fs_base.go
@@ -86,7 +86,17 @@ type FileSystem interface {
 	Remove(name string) error
 	RemoveAll(name string) error
 	Rename(oldname, newname string) error
+
+	// Write buffered data from memory to storage, returning when
+	// all updates have been saved to persistent storage.
 	Sync() error
+
+	// Write buffered data from memory to storage, but don't wait
+	// for all writes to finish before returning. If shortBlocks
+	// is true, flush everything; otherwise, if there's less than
+	// a full block of buffered data at the end of a stream, leave
+	// it buffered in memory in case more data can be appended.
+	Flush(shortBlocks bool) error
 }
 
 type inode interface {
@@ -560,6 +570,11 @@ func (fs *fileSystem) Sync() error {
 	return ErrInvalidOperation
 }
 
+func (fs *fileSystem) Flush(bool) error {
+	log.Printf("TODO: flush fileSystem")
+	return ErrInvalidOperation
+}
+
 // rlookup (recursive lookup) returns the inode for the file/directory
 // with the given name (which may contain "/" separators). If no such
 // file/directory exists, the returned node is nil.
diff --git a/sdk/go/arvados/fs_collection.go b/sdk/go/arvados/fs_collection.go
index 972b3979f..3a45dda29 100644
--- a/sdk/go/arvados/fs_collection.go
+++ b/sdk/go/arvados/fs_collection.go
@@ -21,7 +21,7 @@ import (
 
 var (
 	maxBlockSize      = 1 << 26
-	concurrentWriters = 4 // max goroutines writing to Keep during sync()
+	concurrentWriters = 4 // max goroutines writing to Keep during flush()
 	writeAheadBlocks  = 1 // max background jobs flushing to Keep before blocking writes
 )
 
@@ -38,6 +38,9 @@ type CollectionFileSystem interface {
 
 	// Total data bytes in all files.
 	Size() int64
+
+	// Memory consumed by buffered file data.
+	memorySize() int64
 }
 
 type collectionFileSystem struct {
@@ -143,6 +146,19 @@ func (fs *collectionFileSystem) Sync() error {
 	return nil
 }
 
+func (fs *collectionFileSystem) Flush(shortBlocks bool) error {
+	fs.fileSystem.root.Lock()
+	defer fs.fileSystem.root.Unlock()
+	dn := fs.fileSystem.root.(*dirnode)
+	return dn.flush(context.TODO(), newThrottle(concurrentWriters), dn.sortedNames(), flushOpts{sync: false, shortBlocks: shortBlocks})
+}
+
+func (fs *collectionFileSystem) memorySize() int64 {
+	fs.fileSystem.root.Lock()
+	defer fs.fileSystem.root.Unlock()
+	return fs.fileSystem.root.(*dirnode).memorySize()
+}
+
 func (fs *collectionFileSystem) MarshalManifest(prefix string) (string, error) {
 	fs.fileSystem.root.Lock()
 	defer fs.fileSystem.root.Unlock()
@@ -501,7 +517,7 @@ func (fn *filenode) Write(p []byte, startPtr filenodePtr) (n int, ptr filenodePt
 // Write some data out to disk to reduce memory use. Caller must have
 // write lock.
 func (fn *filenode) pruneMemSegments() {
-	// TODO: share code with (*dirnode)sync()
+	// TODO: share code with (*dirnode)flush()
 	// TODO: pack/flush small blocks too, when fragmented
 	if fn.throttle == nil {
 		// TODO: share a throttle with filesystem
@@ -529,7 +545,7 @@ func (fn *filenode) pruneMemSegments() {
 			fn.throttle.Release()
 			fn.Lock()
 			defer fn.Unlock()
-			if curbuf := seg.buf[:1]; &curbuf[0] != &buf[0] {
+			if seg.flushing != done {
 				// A new seg.buf has been allocated.
 				return
 			}
@@ -556,8 +572,8 @@ func (fn *filenode) pruneMemSegments() {
 	}
 }
 
-// Block until all pending pruneMemSegments work is finished. Caller
-// must NOT have lock.
+// Block until all pending pruneMemSegments/flush work is
+// finished. Caller must NOT have lock.
 func (fn *filenode) waitPrune() {
 	var pending []<-chan struct{}
 	fn.Lock()
@@ -613,51 +629,132 @@ type fnSegmentRef struct {
 // storedSegments that reference the relevant portions of the new
 // block.
 //
+// If sync is false, commitBlock returns right away, after starting a
+// goroutine to do the writes, reacquire the filenodes' locks, and
+// swap out the *memSegments. Some filenodes' segments might get
+// modified/rearranged in the meantime, in which case commitBlock
+// won't replace them.
+//
 // Caller must have write lock.
-func (dn *dirnode) commitBlock(ctx context.Context, throttle *throttle, refs []fnSegmentRef) error {
-	if len(refs) == 0 {
-		return nil
-	}
-	throttle.Acquire()
-	defer throttle.Release()
+func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, sync bool) error {
 	if err := ctx.Err(); err != nil {
 		return err
 	}
+	done := make(chan struct{})
 	block := make([]byte, 0, maxBlockSize)
+	segs := make([]*memSegment, 0, len(refs))
 	for _, ref := range refs {
-		block = append(block, ref.fn.segments[ref.idx].(*memSegment).buf...)
-	}
-	locator, _, err := dn.fs.PutB(block)
-	if err != nil {
-		return err
+		seg := ref.fn.segments[ref.idx].(*memSegment)
+		if seg.flushing != nil && !sync {
+			// Let the other flushing goroutine finish. If
+			// it fails, we'll try again next time.
+			return nil
+		} else {
+			// In sync mode, we proceed regardless of
+			// whether another flush is in progress: It
+			// can't finish before we do, because we hold
+			// fn's lock until we finish our own writes.
+		}
+		seg.flushing = done
+		block = append(block, seg.buf...)
+		segs = append(segs, seg)
 	}
-	off := 0
-	for _, ref := range refs {
-		data := ref.fn.segments[ref.idx].(*memSegment).buf
-		ref.fn.segments[ref.idx] = storedSegment{
-			kc:      dn.fs,
-			locator: locator,
-			size:    len(block),
-			offset:  off,
-			length:  len(data),
+	errs := make(chan error, 1)
+	go func() {
+		defer close(done)
+		defer close(errs)
+		locked := map[*filenode]bool{}
+		locator, _, err := dn.fs.PutB(block)
+		{
+			if !sync {
+				for _, name := range dn.sortedNames() {
+					if fn, ok := dn.inodes[name].(*filenode); ok {
+						fn.Lock()
+						defer fn.Unlock()
+						locked[fn] = true
+					}
+				}
+			}
+			defer func() {
+				for _, seg := range segs {
+					if seg.flushing == done {
+						seg.flushing = nil
+					}
+				}
+			}()
+		}
+		if err != nil {
+			errs <- err
+			return
 		}
-		off += len(data)
-		ref.fn.memsize -= int64(len(data))
+		off := 0
+		for idx, ref := range refs {
+			if !sync {
+				// In async mode, fn's lock was
+				// released while we were waiting for
+				// PutB(); lots of things might have
+				// changed.
+				if len(ref.fn.segments) <= ref.idx {
+					// file segments have
+					// rearranged or changed in
+					// some way
+					continue
+				} else if seg, ok := ref.fn.segments[ref.idx].(*memSegment); !ok || seg != segs[idx] {
+					// segment has been replaced
+					continue
+				} else if seg.flushing != done {
+					// seg.buf has been replaced
+					continue
+				} else if !locked[ref.fn] {
+					// file was renamed, moved, or
+					// deleted since we called
+					// PutB
+					continue
+				}
+			}
+			data := ref.fn.segments[ref.idx].(*memSegment).buf
+			ref.fn.segments[ref.idx] = storedSegment{
+				kc:      dn.fs,
+				locator: locator,
+				size:    len(block),
+				offset:  off,
+				length:  len(data),
+			}
+			off += len(data)
+			ref.fn.memsize -= int64(len(data))
+		}
+	}()
+	if sync {
+		return <-errs
+	} else {
+		return nil
 	}
-	return nil
 }
 
-// sync flushes in-memory data and remote block references (for the
+type flushOpts struct {
+	sync        bool
+	shortBlocks bool
+}
+
+// flush in-memory data and remote-cluster block references (for the
 // children with the given names, which must be children of dn) to
-// local persistent storage. Caller must have write lock on dn and the
-// named children.
-func (dn *dirnode) sync(ctx context.Context, throttle *throttle, names []string) error {
+// local-cluster persistent storage.
+//
+// Caller must have write lock on dn and the named children.
+//
+// If any children are dirs, they will be flushed recursively.
+func (dn *dirnode) flush(ctx context.Context, throttle *throttle, names []string, opts flushOpts) error {
 	cg := newContextGroup(ctx)
 	defer cg.Cancel()
 
 	goCommit := func(refs []fnSegmentRef) {
+		if len(refs) == 0 {
+			return
+		}
 		cg.Go(func() error {
-			return dn.commitBlock(cg.Context(), throttle, refs)
+			throttle.Acquire()
+			defer throttle.Release()
+			return dn.commitBlock(cg.Context(), refs, opts.sync)
 		})
 	}
 
@@ -665,46 +762,86 @@ func (dn *dirnode) sync(ctx context.Context, throttle *throttle, names []string)
 	var pendingLen int = 0
 	localLocator := map[string]string{}
 	for _, name := range names {
-		fn, ok := dn.inodes[name].(*filenode)
-		if !ok {
-			continue
-		}
-		for idx, seg := range fn.segments {
-			switch seg := seg.(type) {
-			case storedSegment:
-				loc, ok := localLocator[seg.locator]
-				if !ok {
-					var err error
-					loc, err = dn.fs.LocalLocator(seg.locator)
-					if err != nil {
-						return err
+		switch node := dn.inodes[name].(type) {
+		case *dirnode:
+			names := node.sortedNames()
+			for _, name := range names {
+				child := node.inodes[name]
+				child.Lock()
+				defer child.Unlock()
+			}
+			cg.Go(func() error { return node.flush(cg.Context(), throttle, node.sortedNames(), opts) })
+		case *filenode:
+			for idx, seg := range node.segments {
+				switch seg := seg.(type) {
+				case storedSegment:
+					loc, ok := localLocator[seg.locator]
+					if !ok {
+						var err error
+						loc, err = dn.fs.LocalLocator(seg.locator)
+						if err != nil {
+							return err
+						}
+						localLocator[seg.locator] = loc
 					}
-					localLocator[seg.locator] = loc
-				}
-				seg.locator = loc
-				fn.segments[idx] = seg
-			case *memSegment:
-				if seg.Len() > maxBlockSize/2 {
-					goCommit([]fnSegmentRef{{fn, idx}})
-					continue
-				}
-				if pendingLen+seg.Len() > maxBlockSize {
-					goCommit(pending)
-					pending = nil
-					pendingLen = 0
+					seg.locator = loc
+					node.segments[idx] = seg
+				case *memSegment:
+					if seg.Len() > maxBlockSize/2 {
+						goCommit([]fnSegmentRef{{node, idx}})
+						continue
+					}
+					if pendingLen+seg.Len() > maxBlockSize {
+						goCommit(pending)
+						pending = nil
+						pendingLen = 0
+					}
+					pending = append(pending, fnSegmentRef{node, idx})
+					pendingLen += seg.Len()
+				default:
+					panic(fmt.Sprintf("can't sync segment type %T", seg))
 				}
-				pending = append(pending, fnSegmentRef{fn, idx})
-				pendingLen += seg.Len()
-			default:
-				panic(fmt.Sprintf("can't sync segment type %T", seg))
 			}
 		}
 	}
-	goCommit(pending)
+	if opts.shortBlocks {
+		goCommit(pending)
+	}
 	return cg.Wait()
 }
 
 // caller must have write lock.
+func (dn *dirnode) memorySize() (size int64) {
+	for _, name := range dn.sortedNames() {
+		node := dn.inodes[name]
+		node.Lock()
+		defer node.Unlock()
+		switch node := node.(type) {
+		case *dirnode:
+			size += node.memorySize()
+		case *filenode:
+			for _, seg := range node.segments {
+				switch seg := seg.(type) {
+				case *memSegment:
+					size += int64(seg.Len())
+				}
+			}
+		}
+	}
+	return
+}
+
+// caller must have write lock.
+func (dn *dirnode) sortedNames() []string {
+	names := make([]string, 0, len(dn.inodes))
+	for name := range dn.inodes {
+		names = append(names, name)
+	}
+	sort.Strings(names)
+	return names
+}
+
+// caller must have write lock.
 func (dn *dirnode) marshalManifest(ctx context.Context, prefix string, throttle *throttle) (string, error) {
 	cg := newContextGroup(ctx)
 	defer cg.Cancel()
@@ -720,11 +857,7 @@ func (dn *dirnode) marshalManifest(ctx context.Context, prefix string, throttle
 		return manifestEscape(prefix) + " d41d8cd98f00b204e9800998ecf8427e+0 0:0:\\056\n", nil
 	}
 
-	names := make([]string, 0, len(dn.inodes))
-	for name := range dn.inodes {
-		names = append(names, name)
-	}
-	sort.Strings(names)
+	names := dn.sortedNames()
 
 	// Wait for children to finish any pending write operations
 	// before locking them.
@@ -772,7 +905,7 @@ func (dn *dirnode) marshalManifest(ctx context.Context, prefix string, throttle
 
 		var fileparts []filepart
 		var blocks []string
-		if err := dn.sync(cg.Context(), throttle, names); err != nil {
+		if err := dn.flush(cg.Context(), throttle, filenames, flushOpts{sync: true, shortBlocks: true}); err != nil {
 			return err
 		}
 		for _, name := range filenames {
@@ -805,7 +938,7 @@ func (dn *dirnode) marshalManifest(ctx context.Context, prefix string, throttle
 				default:
 					// This can't happen: we
 					// haven't unlocked since
-					// calling sync().
+					// calling flush(sync=true).
 					panic(fmt.Sprintf("can't marshal segment type %T", seg))
 				}
 			}
diff --git a/sdk/go/arvados/fs_collection_test.go b/sdk/go/arvados/fs_collection_test.go
index 7fd03b120..fe3ad7a1e 100644
--- a/sdk/go/arvados/fs_collection_test.go
+++ b/sdk/go/arvados/fs_collection_test.go
@@ -535,7 +535,7 @@ func (s *CollectionFSSuite) TestConcurrentWriters(c *check.C) {
 	}
 
 	maxBlockSize = 8
-	defer func() { maxBlockSize = 2 << 26 }()
+	defer func() { maxBlockSize = 1 << 26 }()
 
 	var wg sync.WaitGroup
 	for n := 0; n < 128; n++ {
@@ -1039,7 +1039,7 @@ func (s *CollectionFSSuite) TestOpenFileFlags(c *check.C) {
 	c.Check(err, check.ErrorMatches, `invalid flag.*`)
 }
 
-func (s *CollectionFSSuite) TestFlushFullBlocks(c *check.C) {
+func (s *CollectionFSSuite) TestFlushFullBlocksWritingLongFile(c *check.C) {
 	defer func(wab, mbs int) {
 		writeAheadBlocks = wab
 		maxBlockSize = mbs
@@ -1105,6 +1105,91 @@ func (s *CollectionFSSuite) TestFlushFullBlocks(c *check.C) {
 	c.Check(currentMemExtents(), check.HasLen, 0)
 }
 
+// Ensure blocks get flushed to disk if a lot of data is written to
+// small files/directories without calling sync().
+//
+// Write four 512KiB files into each of 256 top-level dirs (total
+// 512MiB), calling Flush() every 8 dirs. Ensure memory usage never
+// exceeds 24MiB (4 concurrentWriters * 2MiB + 8 unflushed dirs *
+// 2MiB).
+func (s *CollectionFSSuite) TestFlushAll(c *check.C) {
+	fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+	c.Assert(err, check.IsNil)
+
+	s.kc.onPut = func([]byte) {
+		// discard flushed data -- otherwise the stub will use
+		// unlimited memory
+		time.Sleep(time.Millisecond)
+		s.kc.Lock()
+		defer s.kc.Unlock()
+		s.kc.blocks = map[string][]byte{}
+	}
+	for i := 0; i < 256; i++ {
+		buf := bytes.NewBuffer(make([]byte, 524288))
+		fmt.Fprintf(buf, "test file in dir%d", i)
+
+		dir := fmt.Sprintf("dir%d", i)
+		fs.Mkdir(dir, 0755)
+		for j := 0; j < 2; j++ {
+			f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
+			c.Assert(err, check.IsNil)
+			defer f.Close()
+			_, err = io.Copy(f, buf)
+			c.Assert(err, check.IsNil)
+		}
+
+		if i%8 == 0 {
+			fs.Flush(true)
+		}
+
+		size := fs.memorySize()
+		if !c.Check(size <= 1<<24, check.Equals, true) {
+			c.Logf("at dir%d fs.memorySize()=%d", i, size)
+			return
+		}
+	}
+}
+
+// Ensure short blocks at the end of a stream don't get flushed by
+// Flush(false).
+//
+// Write 67x 1MiB files to each of 8 dirs, and check that 8 full 64MiB
+// blocks have been flushed while 8x 3MiB is still buffered in memory.
+func (s *CollectionFSSuite) TestFlushFullBlocksOnly(c *check.C) {
+	fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+	c.Assert(err, check.IsNil)
+
+	var flushed int64
+	s.kc.onPut = func(p []byte) {
+		atomic.AddInt64(&flushed, int64(len(p)))
+	}
+
+	nDirs := 8
+	megabyte := make([]byte, 1<<20)
+	for i := 0; i < nDirs; i++ {
+		dir := fmt.Sprintf("dir%d", i)
+		fs.Mkdir(dir, 0755)
+		for j := 0; j < 67; j++ {
+			f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
+			c.Assert(err, check.IsNil)
+			defer f.Close()
+			_, err = f.Write(megabyte)
+			c.Assert(err, check.IsNil)
+		}
+	}
+	c.Check(fs.memorySize(), check.Equals, int64(nDirs*67<<20))
+	c.Check(flushed, check.Equals, int64(0))
+
+	fs.Flush(false)
+	expectSize := int64(nDirs * 3 << 20)
+
+	// Wait for flush to finish
+	for deadline := time.Now().Add(5 * time.Second); fs.memorySize() > expectSize && time.Now().Before(deadline); time.Sleep(10 * time.Millisecond) {
+	}
+	c.Check(fs.memorySize(), check.Equals, expectSize)
+	c.Check(flushed, check.Equals, int64(nDirs*64<<20))
+}
+
 func (s *CollectionFSSuite) TestBrokenManifests(c *check.C) {
 	for _, txt := range []string{
 		"\n",
diff --git a/services/crunch-run/copier.go b/services/crunch-run/copier.go
index 3f529f631..555a2654d 100644
--- a/services/crunch-run/copier.go
+++ b/services/crunch-run/copier.go
@@ -82,7 +82,18 @@ func (cp *copier) Copy() (string, error) {
 			return "", fmt.Errorf("error making directory %q in output collection: %v", d, err)
 		}
 	}
+	var lastparentdir string
 	for _, f := range cp.files {
+		// If a dir has just had its last file added, do a
+		// full Flush. Otherwise, do a partial Flush (write
+		// full-size blocks, but leave the last short block
+		// open so f's data can be packed with it).
+		dir, _ := filepath.Split(f.dst)
+		if err := fs.Flush(dir != lastparentdir); err != nil {
+			return "", fmt.Errorf("error flushing output collection file data: %v", err)
+		}
+		lastparentdir = dir
+
 		err = cp.copyFile(fs, f)
 		if err != nil {
 			return "", fmt.Errorf("error copying file %q into output collection: %v", f, err)

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list