[ARVADOS] updated: 1.3.0-1717-g5e109ae2b

Git user git at public.curoverse.com
Thu Oct 17 14:15:29 UTC 2019


Summary of changes:
 sdk/go/arvados/fs_base.go            |  8 ++++---
 sdk/go/arvados/fs_collection.go      | 30 +++++++++++++++++++++-----
 sdk/go/arvados/fs_collection_test.go | 41 +++++++++++++++++++++++++++---------
 services/crunch-run/copier.go        | 23 ++++++++++++--------
 4 files changed, 75 insertions(+), 27 deletions(-)

       via  5e109ae2b94f5dcaddb19da360507f73078e2d1c (commit)
      from  15917b2b3902994ca4f6b004577b1130ba0fdaf0 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 5e109ae2b94f5dcaddb19da360507f73078e2d1c
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Thu Oct 17 10:01:47 2019 -0400

    15652: Only flush the current dir, and only once per 64 MiB.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/sdk/go/arvados/fs_base.go b/sdk/go/arvados/fs_base.go
index f5916f957..359e6b67e 100644
--- a/sdk/go/arvados/fs_base.go
+++ b/sdk/go/arvados/fs_base.go
@@ -95,8 +95,10 @@ type FileSystem interface {
 	// for all writes to finish before returning. If shortBlocks
 	// is true, flush everything; otherwise, if there's less than
 	// a full block of buffered data at the end of a stream, leave
-	// it buffered in memory in case more data can be appended.
-	Flush(shortBlocks bool) error
+	// it buffered in memory in case more data can be appended. If
+	// path is "", flush all dirs/streams; otherwise, flush only
+	// the specified dir/stream.
+	Flush(path string, shortBlocks bool) error
 }
 
 type inode interface {
@@ -570,7 +572,7 @@ func (fs *fileSystem) Sync() error {
 	return ErrInvalidOperation
 }
 
-func (fs *fileSystem) Flush(bool) error {
+func (fs *fileSystem) Flush(string, bool) error {
 	log.Printf("TODO: flush fileSystem")
 	return ErrInvalidOperation
 }
diff --git a/sdk/go/arvados/fs_collection.go b/sdk/go/arvados/fs_collection.go
index 543d89385..0722da6b0 100644
--- a/sdk/go/arvados/fs_collection.go
+++ b/sdk/go/arvados/fs_collection.go
@@ -146,11 +146,31 @@ func (fs *collectionFileSystem) Sync() error {
 	return nil
 }
 
-func (fs *collectionFileSystem) Flush(shortBlocks bool) error {
-	fs.fileSystem.root.Lock()
-	defer fs.fileSystem.root.Unlock()
-	dn := fs.fileSystem.root.(*dirnode)
-	return dn.flush(context.TODO(), newThrottle(concurrentWriters), dn.sortedNames(), flushOpts{sync: false, shortBlocks: shortBlocks})
+func (fs *collectionFileSystem) Flush(path string, shortBlocks bool) error {
+	node, err := rlookup(fs.fileSystem.root, path)
+	if err != nil {
+		return err
+	}
+	dn, ok := node.(*dirnode)
+	if !ok {
+		return ErrNotADirectory
+	}
+	dn.Lock()
+	defer dn.Unlock()
+	names := dn.sortedNames()
+	if path != "" {
+		// Caller only wants to flush the specified dir,
+		// non-recursively.  Drop subdirs from the list of
+		// names.
+		var filenames []string
+		for _, name := range names {
+			if _, ok := dn.inodes[name].(*filenode); ok {
+				filenames = append(filenames, name)
+			}
+		}
+		names = filenames
+	}
+	return dn.flush(context.TODO(), newThrottle(concurrentWriters), names, flushOpts{sync: false, shortBlocks: shortBlocks})
 }
 
 func (fs *collectionFileSystem) memorySize() int64 {
diff --git a/sdk/go/arvados/fs_collection_test.go b/sdk/go/arvados/fs_collection_test.go
index fe3ad7a1e..edc35ab9b 100644
--- a/sdk/go/arvados/fs_collection_test.go
+++ b/sdk/go/arvados/fs_collection_test.go
@@ -1139,7 +1139,7 @@ func (s *CollectionFSSuite) TestFlushAll(c *check.C) {
 		}
 
 		if i%8 == 0 {
-			fs.Flush(true)
+			fs.Flush("", true)
 		}
 
 		size := fs.memorySize()
@@ -1164,9 +1164,9 @@ func (s *CollectionFSSuite) TestFlushFullBlocksOnly(c *check.C) {
 		atomic.AddInt64(&flushed, int64(len(p)))
 	}
 
-	nDirs := 8
+	nDirs := int64(8)
 	megabyte := make([]byte, 1<<20)
-	for i := 0; i < nDirs; i++ {
+	for i := int64(0); i < nDirs; i++ {
 		dir := fmt.Sprintf("dir%d", i)
 		fs.Mkdir(dir, 0755)
 		for j := 0; j < 67; j++ {
@@ -1180,14 +1180,35 @@ func (s *CollectionFSSuite) TestFlushFullBlocksOnly(c *check.C) {
 	c.Check(fs.memorySize(), check.Equals, int64(nDirs*67<<20))
 	c.Check(flushed, check.Equals, int64(0))
 
-	fs.Flush(false)
-	expectSize := int64(nDirs * 3 << 20)
-
-	// Wait for flush to finish
-	for deadline := time.Now().Add(5 * time.Second); fs.memorySize() > expectSize && time.Now().Before(deadline); time.Sleep(10 * time.Millisecond) {
+	waitForFlush := func(expectUnflushed, expectFlushed int64) {
+		for deadline := time.Now().Add(5 * time.Second); fs.memorySize() > expectUnflushed && time.Now().Before(deadline); time.Sleep(10 * time.Millisecond) {
+		}
+		c.Check(fs.memorySize(), check.Equals, expectUnflushed)
+		c.Check(flushed, check.Equals, expectFlushed)
 	}
-	c.Check(fs.memorySize(), check.Equals, expectSize)
-	c.Check(flushed, check.Equals, int64(nDirs*64<<20))
+
+	// Nothing flushed yet
+	waitForFlush((nDirs*67)<<20, 0)
+
+	// Flushing a non-empty dir "/" is non-recursive and there are
+	// no top-level files, so this has no effect
+	fs.Flush("/", false)
+	waitForFlush((nDirs*67)<<20, 0)
+
+	// Flush the full block in dir0
+	fs.Flush("dir0", false)
+	waitForFlush((nDirs*67-64)<<20, 64<<20)
+
+	err = fs.Flush("dir-does-not-exist", false)
+	c.Check(err, check.NotNil)
+
+	// Flush full blocks in all dirs
+	fs.Flush("", false)
+	waitForFlush(nDirs*3<<20, nDirs*64<<20)
+
+	// Flush non-full blocks, too
+	fs.Flush("", true)
+	waitForFlush(0, nDirs*67<<20)
 }
 
 func (s *CollectionFSSuite) TestBrokenManifests(c *check.C) {
diff --git a/services/crunch-run/copier.go b/services/crunch-run/copier.go
index 555a2654d..317db1a7f 100644
--- a/services/crunch-run/copier.go
+++ b/services/crunch-run/copier.go
@@ -82,6 +82,7 @@ func (cp *copier) Copy() (string, error) {
 			return "", fmt.Errorf("error making directory %q in output collection: %v", d, err)
 		}
 	}
+	var unflushed int64
 	var lastparentdir string
 	for _, f := range cp.files {
 		// If a dir has just had its last file added, do a
@@ -89,37 +90,41 @@ func (cp *copier) Copy() (string, error) {
 		// full-size blocks, but leave the last short block
 		// open so f's data can be packed with it).
 		dir, _ := filepath.Split(f.dst)
-		if err := fs.Flush(dir != lastparentdir); err != nil {
-			return "", fmt.Errorf("error flushing output collection file data: %v", err)
+		if dir != lastparentdir || unflushed > 1<<26 {
+			if err := fs.Flush("/"+lastparentdir, dir != lastparentdir); err != nil {
+				return "", fmt.Errorf("error flushing output collection file data: %v", err)
+			}
+			unflushed = 0
 		}
 		lastparentdir = dir
 
-		err = cp.copyFile(fs, f)
+		n, err := cp.copyFile(fs, f)
 		if err != nil {
 			return "", fmt.Errorf("error copying file %q into output collection: %v", f, err)
 		}
+		unflushed += n
 	}
 	return fs.MarshalManifest(".")
 }
 
-func (cp *copier) copyFile(fs arvados.CollectionFileSystem, f filetodo) error {
+func (cp *copier) copyFile(fs arvados.CollectionFileSystem, f filetodo) (int64, error) {
 	cp.logger.Printf("copying %q (%d bytes)", f.dst, f.size)
 	dst, err := fs.OpenFile(f.dst, os.O_CREATE|os.O_WRONLY, 0666)
 	if err != nil {
-		return err
+		return 0, err
 	}
 	src, err := os.Open(f.src)
 	if err != nil {
 		dst.Close()
-		return err
+		return 0, err
 	}
 	defer src.Close()
-	_, err = io.Copy(dst, src)
+	n, err := io.Copy(dst, src)
 	if err != nil {
 		dst.Close()
-		return err
+		return n, err
 	}
-	return dst.Close()
+	return n, dst.Close()
 }
 
 // Append to cp.manifest, cp.files, and cp.dirs so as to copy src (an

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list