[ARVADOS] updated: 305490369b502d47607b7ffe790d2c85e9a8db34

Git user git at public.curoverse.com
Thu Oct 5 08:47:39 EDT 2017


Summary of changes:
 services/crunch-run/crunchrun.go      | 143 ++++++++++++++++------------------
 services/crunch-run/crunchrun_test.go |  20 ++---
 services/crunch-run/upload.go         |  78 ++++++-------------
 services/crunch-run/upload_test.go    |  39 +++++++---
 4 files changed, 126 insertions(+), 154 deletions(-)

       via  305490369b502d47607b7ffe790d2c85e9a8db34 (commit)
      from  80ce9f9d06128195a1f8506fc5f32b6fb589cfcc (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 305490369b502d47607b7ffe790d2c85e9a8db34
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Thu Oct 5 08:47:04 2017 -0400

    12183: Refactor file upload so there is one walk to handle both symlinks and
    regular files.
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>

diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index 53ca1b9..b387b96 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -909,42 +909,47 @@ func (runner *ContainerRunner) WaitFinish() (err error) {
 	return nil
 }
 
-// EvalSymlinks follows symlinks within the output directory.  If the symlink
-// leads to a keep mount, copy the manifest text from the keep mount into the
-// output manifestText.  Ensure that whether symlinks are relative or absolute,
-// they must remain within the output directory.
+// UploadFile uploads files within the output directory, with special handling
+// for symlinks. If the symlink leads to a keep mount, copy the manifest text
+// from the keep mount into the output manifestText.  Ensure that whether
+// symlinks are relative or absolute, they must remain within the output
+// directory.
 //
 // Assumes initial value of "path" is absolute, and located within runner.HostOutputDir.
-func (runner *ContainerRunner) EvalSymlinks(path string, binds []string) (manifestText string, symlinksToRemove []string, err error) {
-	var links []string
+func (runner *ContainerRunner) UploadOutputFile(
+	path string,
+	info os.FileInfo,
+	infoerr error,
+	binds []string,
+	walkUpload *WalkUpload,
+	relocateFrom string,
+	relocateTo string) (manifestText string, err error) {
 
-	defer func() {
-		if err != nil {
-			symlinksToRemove = append(symlinksToRemove, links...)
-		}
-	}()
+	if infoerr != nil {
+		return "", infoerr
+	}
 
-	for n := 0; n < 32; n++ {
-		var info os.FileInfo
-		info, err = os.Lstat(path)
-		if err != nil {
-			return
-		}
+	relocated := relocateTo + path[len(relocateFrom):]
+
+	if info.Mode().IsRegular() {
+		return "", walkUpload.UploadFile(relocated, path)
+	}
 
+	// Not a regular file, try to follow symlinks
+	var nextlink = path
+	for n := 0; n < 32; n++ {
 		if info.Mode()&os.ModeSymlink == 0 {
-			// Not a symlink, nothing to do.
+			// Not a symlink, don't do anything
 			return
 		}
 
-		// Remember symlink for cleanup later
-		links = append(links, path)
-
 		var readlinktgt string
-		readlinktgt, err = os.Readlink(path)
-		tgt := readlinktgt
+		readlinktgt, err = os.Readlink(nextlink)
 		if err != nil {
 			return
 		}
+
+		tgt := readlinktgt
 		if !strings.HasPrefix(tgt, "/") {
 			// Relative symlink, resolve it to host path
 			tgt = filepath.Join(filepath.Dir(path), tgt)
@@ -954,8 +959,6 @@ func (runner *ContainerRunner) EvalSymlinks(path string, binds []string) (manife
 			tgt = filepath.Join(runner.HostOutputDir, tgt[len(runner.Container.OutputPath):])
 		}
 
-		runner.CrunchLog.Printf("Resolve %q to %q", path, tgt)
-
 		// go through mounts and try reverse map to collection reference
 		for _, bind := range binds {
 			mnt := runner.Container.Mounts[bind]
@@ -967,51 +970,56 @@ func (runner *ContainerRunner) EvalSymlinks(path string, binds []string) (manife
 				adjustedMount := mnt
 				adjustedMount.Path = filepath.Join(adjustedMount.Path, targetSuffix)
 
-				for _, l := range links {
-					// The chain of one or more symlinks
-					// terminates in this keep mount, so
-					// add them all to the manifest text at
-					// appropriate locations.
-					var m string
-					outputSuffix := l[len(runner.HostOutputDir):]
-					m, err = runner.getCollectionManifestForPath(adjustedMount, outputSuffix)
-					if err != nil {
-						return
-					}
-					manifestText = manifestText + m
-					symlinksToRemove = append(symlinksToRemove, l)
-				}
+				// Terminates in this keep mount, so add the
+				// manifest text at appropriate location.
+				outputSuffix := path[len(runner.HostOutputDir):]
+				manifestText, err = runner.getCollectionManifestForPath(adjustedMount, outputSuffix)
 				return
 			}
 		}
 
-		// If target is not a mount, it must be within the output
-		// directory, otherwise it is an error.
+		// If target is not a collection mount, it must be within the
+		// output directory, otherwise it is an error.
 		if !strings.HasPrefix(tgt, runner.HostOutputDir+"/") {
 			err = fmt.Errorf("Output directory symlink %q points to invalid location %q, must point to mount or output directory.",
 				path[len(runner.HostOutputDir):], readlinktgt)
 			return
 		}
 
-		// Update symlink to host FS
-		err = os.Remove(path)
+		info, err = os.Lstat(tgt)
 		if err != nil {
-			err = fmt.Errorf("Error removing symlink %q: %v", path, err)
+			// tgt doesn't exist or lacks permissions
+			err = fmt.Errorf("Output directory symlink %q points to invalid location %q, must point to mount or output directory.",
+				path[len(runner.HostOutputDir):], readlinktgt)
 			return
 		}
 
-		err = os.Symlink(tgt, path)
-		if err != nil {
-			err = fmt.Errorf("Error updating symlink %q: %v", path, err)
+		if info.Mode().IsRegular() {
+			// Symlink leads to regular file.  Need to read from
+			// the target but upload it at the original path.
+			return "", walkUpload.UploadFile(relocated, tgt)
+		}
+
+		if info.Mode().IsDir() {
+			// Symlink leads to directory.  Walk() doesn't follow
+			// directory symlinks, so we walk the target directory
+			// instead.  Within the walk, file paths are relocated
+			// so they appear under the original symlink path.
+			err = filepath.Walk(tgt, func(walkpath string, walkinfo os.FileInfo, walkerr error) error {
+				var m string
+				m, walkerr = runner.UploadOutputFile(walkpath, walkinfo, walkerr, binds, walkUpload, tgt, relocated)
+				if walkerr == nil {
+					manifestText = manifestText + m
+				}
+				return walkerr
+			})
 			return
 		}
 
-		// Target is within the output directory, so loop and check if
-		// it is also a symlink.
-		path = tgt
+		nextlink = tgt
 	}
 	// Got stuck in a loop or just a pathological number of links, give up.
-	err = fmt.Errorf("Too many symlinks.")
+	err = fmt.Errorf("Followed too many symlinks from path %q", path)
 	return
 }
 
@@ -1061,40 +1069,25 @@ func (runner *ContainerRunner) CaptureOutput() error {
 	if err != nil {
 		// Regular directory
 
-		symlinksToRemove := make(map[string]bool)
+		cw := CollectionWriter{0, runner.Kc, nil, nil, sync.Mutex{}}
+		walkUpload := cw.BeginUpload(runner.HostOutputDir, runner.CrunchLog.Logger)
+
 		var m string
-		var srm []string
-		// Find symlinks to arv-mounted files & dirs.
 		err = filepath.Walk(runner.HostOutputDir, func(path string, info os.FileInfo, err error) error {
-			if err != nil {
-				return err
-			}
-			m, srm, err = runner.EvalSymlinks(path, binds)
-			for _, r := range srm {
-				symlinksToRemove[r] = true
-			}
+			m, err = runner.UploadOutputFile(path, info, err, binds, walkUpload, "", "")
 			if err == nil {
 				manifestText = manifestText + m
 			}
 			return err
 		})
-		for l, _ := range symlinksToRemove {
-			err2 := os.Remove(l)
-			if err2 != nil {
-				if err == nil {
-					err = fmt.Errorf("Error removing symlink %q: %v", err2)
-				} else {
-					err = fmt.Errorf("%v\nError removing symlink %q: %v",
-						err, err2)
-				}
-			}
-		}
+
+		cw.EndUpload(walkUpload)
+
 		if err != nil {
-			return fmt.Errorf("While checking output symlinks: %v", err)
+			return fmt.Errorf("While uploading output files: %v", err)
 		}
 
-		cw := CollectionWriter{0, runner.Kc, nil, nil, sync.Mutex{}}
-		m, err = cw.WriteTree(runner.HostOutputDir, runner.CrunchLog.Logger)
+		m, err = cw.ManifestText()
 		manifestText = manifestText + m
 		if err != nil {
 			return fmt.Errorf("While uploading output files: %v", err)
diff --git a/services/crunch-run/crunchrun_test.go b/services/crunch-run/crunchrun_test.go
index 2aaac25..76615ac 100644
--- a/services/crunch-run/crunchrun_test.go
+++ b/services/crunch-run/crunchrun_test.go
@@ -1690,27 +1690,19 @@ func (s *TestSuite) TestEvalSymlinks(c *C) {
 	os.Symlink("/etc/passwd", realTemp+"/p1")
 
 	// Relative outside output dir
-	os.Symlink("../..", realTemp+"/p2")
+	os.Symlink("..", realTemp+"/p2")
 
 	// Circular references
 	os.Symlink("p4", realTemp+"/p3")
 	os.Symlink("p5", realTemp+"/p4")
 	os.Symlink("p3", realTemp+"/p5")
 
-	symlinksToRemove := make(map[string]bool)
+	// Target doesn't exist
+	os.Symlink("p99", realTemp+"/p6")
+
 	for _, v := range []string{"p1", "p2", "p3", "p4", "p5"} {
-		var srm []string
-		_, srm, err = cr.EvalSymlinks(realTemp+"/"+v, []string{})
+		info, err := os.Lstat(realTemp + "/" + v)
+		_, err = cr.UploadOutputFile(realTemp+"/"+v, info, err, []string{}, nil, "", "")
 		c.Assert(err, NotNil)
-		for _, r := range srm {
-			symlinksToRemove[r] = true
-		}
 	}
-	c.Assert(len(symlinksToRemove), Equals, 5)
-
-	c.Assert(map[string]bool{realTemp + "/" + "p1": true,
-		realTemp + "/" + "p2": true,
-		realTemp + "/" + "p3": true,
-		realTemp + "/" + "p4": true,
-		realTemp + "/" + "p5": true}, DeepEquals, symlinksToRemove)
 }
diff --git a/services/crunch-run/upload.go b/services/crunch-run/upload.go
index 3127683..95925e5 100644
--- a/services/crunch-run/upload.go
+++ b/services/crunch-run/upload.go
@@ -18,15 +18,14 @@ import (
 	"crypto/md5"
 	"errors"
 	"fmt"
+	"git.curoverse.com/arvados.git/sdk/go/keepclient"
+	"git.curoverse.com/arvados.git/sdk/go/manifest"
 	"io"
 	"log"
 	"os"
 	"path/filepath"
 	"strings"
 	"sync"
-
-	"git.curoverse.com/arvados.git/sdk/go/keepclient"
-	"git.curoverse.com/arvados.git/sdk/go/manifest"
 )
 
 // Block is a data block in a manifest stream
@@ -263,48 +262,27 @@ type WalkUpload struct {
 	mtx         sync.Mutex
 }
 
-// WalkFunc walks a directory tree, uploads each file found and adds it to the
-// CollectionWriter.
-func (m *WalkUpload) WalkFunc(path string, info os.FileInfo, err error) error {
-	if err != nil {
-		return err
-	}
-
-	targetPath, targetInfo := path, info
-	if info.Mode()&os.ModeSymlink != 0 {
-		// Update targetpath/info to reflect the symlink
-		// target, not the symlink itself
-		targetPath, err = filepath.EvalSymlinks(path)
-		if err != nil {
-			return err
-		}
-		targetInfo, err = os.Stat(targetPath)
-		if err != nil {
-			return fmt.Errorf("stat symlink %q target %q: %s", path, targetPath, err)
-		}
-		if targetInfo.IsDir() {
-			// Symlinks to directories don't get walked, so do it
-			// here.  We've previously checked that they stay in
-			// the output directory and don't result in an endless
-			// loop.
-			filepath.Walk(path+"/.", m.WalkFunc)
-		}
-	}
-
-	if targetInfo.Mode()&os.ModeType != 0 {
-		// Skip directories, pipes, other non-regular files
-		return nil
-	}
-
+func (m *WalkUpload) UploadFile(path string, sourcePath string) error {
 	var dir string
-	if len(path) > (len(m.stripPrefix) + len(info.Name()) + 1) {
-		dir = path[len(m.stripPrefix)+1 : (len(path) - len(info.Name()) - 1)]
+	basename := filepath.Base(path)
+	if len(path) > (len(m.stripPrefix) + len(basename) + 1) {
+		dir = path[len(m.stripPrefix)+1 : (len(path) - len(basename) - 1)]
 	}
 	if dir == "" {
 		dir = "."
 	}
 
-	fn := path[(len(path) - len(info.Name())):]
+	fn := path[(len(path) - len(basename)):]
+
+	info, err := os.Stat(sourcePath)
+	if err != nil {
+		return err
+	}
+	file, err := os.Open(sourcePath)
+	if err != nil {
+		return err
+	}
+	defer file.Close()
 
 	if m.streamMap[dir] == nil {
 		m.streamMap[dir] = &CollectionFileWriter{
@@ -334,16 +312,11 @@ func (m *WalkUpload) WalkFunc(path string, info os.FileInfo, err error) error {
 	// Reset the CollectionFileWriter for a new file
 	fileWriter.NewFile(fn)
 
-	file, err := os.Open(path)
-	if err != nil {
-		return err
-	}
-	defer file.Close()
-
 	m.status.Printf("Uploading %v/%v (%v bytes)", dir, fn, info.Size())
 
 	_, err = io.Copy(fileWriter, file)
 	if err != nil {
+		m.status.Printf("Uh oh")
 		return err
 	}
 
@@ -353,20 +326,15 @@ func (m *WalkUpload) WalkFunc(path string, info os.FileInfo, err error) error {
 	return nil
 }
 
-func (cw *CollectionWriter) WriteTree(root string, status *log.Logger) (manifest string, err error) {
+func (cw *CollectionWriter) BeginUpload(root string, status *log.Logger) *WalkUpload {
 	streamMap := make(map[string]*CollectionFileWriter)
-	wu := &WalkUpload{0, cw.IKeepClient, root, streamMap, status, nil, sync.Mutex{}}
-	err = filepath.Walk(root, wu.WalkFunc)
-
-	if err != nil {
-		return "", err
-	}
+	return &WalkUpload{0, cw.IKeepClient, root, streamMap, status, nil, sync.Mutex{}}
+}
 
+func (cw *CollectionWriter) EndUpload(wu *WalkUpload) {
 	cw.mtx.Lock()
-	for _, st := range streamMap {
+	for _, st := range wu.streamMap {
 		cw.Streams = append(cw.Streams, st)
 	}
 	cw.mtx.Unlock()
-
-	return cw.ManifestText()
 }
diff --git a/services/crunch-run/upload_test.go b/services/crunch-run/upload_test.go
index 96ea2b1..86dab41 100644
--- a/services/crunch-run/upload_test.go
+++ b/services/crunch-run/upload_test.go
@@ -5,13 +5,13 @@
 package main
 
 import (
+	. "gopkg.in/check.v1"
 	"io/ioutil"
 	"log"
 	"os"
+	"path/filepath"
 	"sync"
 	"syscall"
-
-	. "gopkg.in/check.v1"
 )
 
 type UploadTestSuite struct{}
@@ -19,6 +19,25 @@ type UploadTestSuite struct{}
 // Gocheck boilerplate
 var _ = Suite(&UploadTestSuite{})
 
+func writeTree(cw *CollectionWriter, root string, status *log.Logger) (mt string, err error) {
+	walkUpload := cw.BeginUpload(root, status)
+
+	err = filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
+		info, _ = os.Stat(path)
+		if info.Mode().IsRegular() {
+			return walkUpload.UploadFile(path, path)
+		}
+		return nil
+	})
+
+	cw.EndUpload(walkUpload)
+	if err != nil {
+		return "", err
+	}
+	mt, err = cw.ManifestText()
+	return
+}
+
 func (s *TestSuite) TestSimpleUpload(c *C) {
 	tmpdir, _ := ioutil.TempDir("", "")
 	defer func() {
@@ -28,12 +47,12 @@ func (s *TestSuite) TestSimpleUpload(c *C) {
 	ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
 
 	cw := CollectionWriter{0, &KeepTestClient{}, nil, nil, sync.Mutex{}}
-	str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
+	str, err := writeTree(&cw, tmpdir, log.New(os.Stdout, "", 0))
 	c.Check(err, IsNil)
 	c.Check(str, Equals, ". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt\n")
 }
 
-func (s *TestSuite) TestSimpleUploadThreefiles(c *C) {
+func (s *TestSuite) TestUploadThreeFiles(c *C) {
 	tmpdir, _ := ioutil.TempDir("", "")
 	defer func() {
 		os.RemoveAll(tmpdir)
@@ -49,7 +68,7 @@ func (s *TestSuite) TestSimpleUploadThreefiles(c *C) {
 	}
 
 	cw := CollectionWriter{0, &KeepTestClient{}, nil, nil, sync.Mutex{}}
-	str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
+	str, err := writeTree(&cw, tmpdir, log.New(os.Stdout, "", 0))
 
 	c.Check(err, IsNil)
 	c.Check(str, Equals, ". aa65a413921163458c52fea478d5d3ee+9 0:3:file1.txt 3:3:file2.txt 6:3:file3.txt\n")
@@ -67,7 +86,7 @@ func (s *TestSuite) TestSimpleUploadSubdir(c *C) {
 	ioutil.WriteFile(tmpdir+"/subdir/file2.txt", []byte("bar"), 0600)
 
 	cw := CollectionWriter{0, &KeepTestClient{}, nil, nil, sync.Mutex{}}
-	str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
+	str, err := writeTree(&cw, tmpdir, log.New(os.Stdout, "", 0))
 
 	c.Check(err, IsNil)
 
@@ -101,7 +120,7 @@ func (s *TestSuite) TestSimpleUploadLarge(c *C) {
 	ioutil.WriteFile(tmpdir+"/"+"file2.txt", []byte("bar"), 0600)
 
 	cw := CollectionWriter{0, &KeepTestClient{}, nil, nil, sync.Mutex{}}
-	str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
+	str, err := writeTree(&cw, tmpdir, log.New(os.Stdout, "", 0))
 
 	c.Check(err, IsNil)
 	c.Check(str, Equals, ". 00ecf01e0d93385115c9f8bed757425d+67108864 485cd630387b6b1846fe429f261ea05f+1048514 0:68157375:file1.txt 68157375:3:file2.txt\n")
@@ -118,7 +137,7 @@ func (s *TestSuite) TestUploadEmptySubdir(c *C) {
 	ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
 
 	cw := CollectionWriter{0, &KeepTestClient{}, nil, nil, sync.Mutex{}}
-	str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
+	str, err := writeTree(&cw, tmpdir, log.New(os.Stdout, "", 0))
 
 	c.Check(err, IsNil)
 	c.Check(str, Equals, `. acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt
@@ -134,7 +153,7 @@ func (s *TestSuite) TestUploadEmptyFile(c *C) {
 	ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte(""), 0600)
 
 	cw := CollectionWriter{0, &KeepTestClient{}, nil, nil, sync.Mutex{}}
-	str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
+	str, err := writeTree(&cw, tmpdir, log.New(os.Stdout, "", 0))
 
 	c.Check(err, IsNil)
 	c.Check(str, Equals, `. d41d8cd98f00b204e9800998ecf8427e+0 0:0:file1.txt
@@ -150,7 +169,7 @@ func (s *TestSuite) TestUploadError(c *C) {
 	ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
 
 	cw := CollectionWriter{0, &KeepErrorTestClient{}, nil, nil, sync.Mutex{}}
-	str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
+	str, err := writeTree(&cw, tmpdir, log.New(os.Stdout, "", 0))
 
 	c.Check(err, NotNil)
 	c.Check(str, Equals, "")

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list