[ARVADOS] updated: 305490369b502d47607b7ffe790d2c85e9a8db34
Git user
git at public.curoverse.com
Thu Oct 5 08:47:39 EDT 2017
Summary of changes:
services/crunch-run/crunchrun.go | 143 ++++++++++++++++------------------
services/crunch-run/crunchrun_test.go | 20 ++---
services/crunch-run/upload.go | 78 ++++++-------------
services/crunch-run/upload_test.go | 39 +++++++---
4 files changed, 126 insertions(+), 154 deletions(-)
via 305490369b502d47607b7ffe790d2c85e9a8db34 (commit)
from 80ce9f9d06128195a1f8506fc5f32b6fb589cfcc (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 305490369b502d47607b7ffe790d2c85e9a8db34
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Thu Oct 5 08:47:04 2017 -0400
12183: Refactor file upload so there is one walk to handle both symlinks and
regular files.
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index 53ca1b9..b387b96 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -909,42 +909,47 @@ func (runner *ContainerRunner) WaitFinish() (err error) {
return nil
}
-// EvalSymlinks follows symlinks within the output directory. If the symlink
-// leads to a keep mount, copy the manifest text from the keep mount into the
-// output manifestText. Ensure that whether symlinks are relative or absolute,
-// they must remain within the output directory.
+// UploadFile uploads files within the output directory, with special handling
+// for symlinks. If the symlink leads to a keep mount, copy the manifest text
+// from the keep mount into the output manifestText. Ensure that whether
+// symlinks are relative or absolute, they must remain within the output
+// directory.
//
// Assumes initial value of "path" is absolute, and located within runner.HostOutputDir.
-func (runner *ContainerRunner) EvalSymlinks(path string, binds []string) (manifestText string, symlinksToRemove []string, err error) {
- var links []string
+func (runner *ContainerRunner) UploadOutputFile(
+ path string,
+ info os.FileInfo,
+ infoerr error,
+ binds []string,
+ walkUpload *WalkUpload,
+ relocateFrom string,
+ relocateTo string) (manifestText string, err error) {
- defer func() {
- if err != nil {
- symlinksToRemove = append(symlinksToRemove, links...)
- }
- }()
+ if infoerr != nil {
+ return "", infoerr
+ }
- for n := 0; n < 32; n++ {
- var info os.FileInfo
- info, err = os.Lstat(path)
- if err != nil {
- return
- }
+ relocated := relocateTo + path[len(relocateFrom):]
+
+ if info.Mode().IsRegular() {
+ return "", walkUpload.UploadFile(relocated, path)
+ }
+ // Not a regular file, try to follow symlinks
+ var nextlink = path
+ for n := 0; n < 32; n++ {
if info.Mode()&os.ModeSymlink == 0 {
- // Not a symlink, nothing to do.
+ // Not a symlink, don't do anything
return
}
- // Remember symlink for cleanup later
- links = append(links, path)
-
var readlinktgt string
- readlinktgt, err = os.Readlink(path)
- tgt := readlinktgt
+ readlinktgt, err = os.Readlink(nextlink)
if err != nil {
return
}
+
+ tgt := readlinktgt
if !strings.HasPrefix(tgt, "/") {
// Relative symlink, resolve it to host path
tgt = filepath.Join(filepath.Dir(path), tgt)
@@ -954,8 +959,6 @@ func (runner *ContainerRunner) EvalSymlinks(path string, binds []string) (manife
tgt = filepath.Join(runner.HostOutputDir, tgt[len(runner.Container.OutputPath):])
}
- runner.CrunchLog.Printf("Resolve %q to %q", path, tgt)
-
// go through mounts and try reverse map to collection reference
for _, bind := range binds {
mnt := runner.Container.Mounts[bind]
@@ -967,51 +970,56 @@ func (runner *ContainerRunner) EvalSymlinks(path string, binds []string) (manife
adjustedMount := mnt
adjustedMount.Path = filepath.Join(adjustedMount.Path, targetSuffix)
- for _, l := range links {
- // The chain of one or more symlinks
- // terminates in this keep mount, so
- // add them all to the manifest text at
- // appropriate locations.
- var m string
- outputSuffix := l[len(runner.HostOutputDir):]
- m, err = runner.getCollectionManifestForPath(adjustedMount, outputSuffix)
- if err != nil {
- return
- }
- manifestText = manifestText + m
- symlinksToRemove = append(symlinksToRemove, l)
- }
+ // Terminates in this keep mount, so add the
+ // manifest text at appropriate location.
+ outputSuffix := path[len(runner.HostOutputDir):]
+ manifestText, err = runner.getCollectionManifestForPath(adjustedMount, outputSuffix)
return
}
}
- // If target is not a mount, it must be within the output
- // directory, otherwise it is an error.
+ // If target is not a collection mount, it must be within the
+ // output directory, otherwise it is an error.
if !strings.HasPrefix(tgt, runner.HostOutputDir+"/") {
err = fmt.Errorf("Output directory symlink %q points to invalid location %q, must point to mount or output directory.",
path[len(runner.HostOutputDir):], readlinktgt)
return
}
- // Update symlink to host FS
- err = os.Remove(path)
+ info, err = os.Lstat(tgt)
if err != nil {
- err = fmt.Errorf("Error removing symlink %q: %v", path, err)
+ // tgt doesn't exist or lacks permissions
+ err = fmt.Errorf("Output directory symlink %q points to invalid location %q, must point to mount or output directory.",
+ path[len(runner.HostOutputDir):], readlinktgt)
return
}
- err = os.Symlink(tgt, path)
- if err != nil {
- err = fmt.Errorf("Error updating symlink %q: %v", path, err)
+ if info.Mode().IsRegular() {
+ // Symlink leads to regular file. Need to read from
+ // the target but upload it at the original path.
+ return "", walkUpload.UploadFile(relocated, tgt)
+ }
+
+ if info.Mode().IsDir() {
+ // Symlink leads to directory. Walk() doesn't follow
+ // directory symlinks, so we walk the target directory
+ // instead. Within the walk, file paths are relocated
+ // so they appear under the original symlink path.
+ err = filepath.Walk(tgt, func(walkpath string, walkinfo os.FileInfo, walkerr error) error {
+ var m string
+ m, walkerr = runner.UploadOutputFile(walkpath, walkinfo, walkerr, binds, walkUpload, tgt, relocated)
+ if walkerr == nil {
+ manifestText = manifestText + m
+ }
+ return walkerr
+ })
return
}
- // Target is within the output directory, so loop and check if
- // it is also a symlink.
- path = tgt
+ nextlink = tgt
}
// Got stuck in a loop or just a pathological number of links, give up.
- err = fmt.Errorf("Too many symlinks.")
+ err = fmt.Errorf("Followed too many symlinks from path %q", path)
return
}
@@ -1061,40 +1069,25 @@ func (runner *ContainerRunner) CaptureOutput() error {
if err != nil {
// Regular directory
- symlinksToRemove := make(map[string]bool)
+ cw := CollectionWriter{0, runner.Kc, nil, nil, sync.Mutex{}}
+ walkUpload := cw.BeginUpload(runner.HostOutputDir, runner.CrunchLog.Logger)
+
var m string
- var srm []string
- // Find symlinks to arv-mounted files & dirs.
err = filepath.Walk(runner.HostOutputDir, func(path string, info os.FileInfo, err error) error {
- if err != nil {
- return err
- }
- m, srm, err = runner.EvalSymlinks(path, binds)
- for _, r := range srm {
- symlinksToRemove[r] = true
- }
+ m, err = runner.UploadOutputFile(path, info, err, binds, walkUpload, "", "")
if err == nil {
manifestText = manifestText + m
}
return err
})
- for l, _ := range symlinksToRemove {
- err2 := os.Remove(l)
- if err2 != nil {
- if err == nil {
- err = fmt.Errorf("Error removing symlink %q: %v", err2)
- } else {
- err = fmt.Errorf("%v\nError removing symlink %q: %v",
- err, err2)
- }
- }
- }
+
+ cw.EndUpload(walkUpload)
+
if err != nil {
- return fmt.Errorf("While checking output symlinks: %v", err)
+ return fmt.Errorf("While uploading output files: %v", err)
}
- cw := CollectionWriter{0, runner.Kc, nil, nil, sync.Mutex{}}
- m, err = cw.WriteTree(runner.HostOutputDir, runner.CrunchLog.Logger)
+ m, err = cw.ManifestText()
manifestText = manifestText + m
if err != nil {
return fmt.Errorf("While uploading output files: %v", err)
diff --git a/services/crunch-run/crunchrun_test.go b/services/crunch-run/crunchrun_test.go
index 2aaac25..76615ac 100644
--- a/services/crunch-run/crunchrun_test.go
+++ b/services/crunch-run/crunchrun_test.go
@@ -1690,27 +1690,19 @@ func (s *TestSuite) TestEvalSymlinks(c *C) {
os.Symlink("/etc/passwd", realTemp+"/p1")
// Relative outside output dir
- os.Symlink("../..", realTemp+"/p2")
+ os.Symlink("..", realTemp+"/p2")
// Circular references
os.Symlink("p4", realTemp+"/p3")
os.Symlink("p5", realTemp+"/p4")
os.Symlink("p3", realTemp+"/p5")
- symlinksToRemove := make(map[string]bool)
+ // Target doesn't exist
+ os.Symlink("p99", realTemp+"/p6")
+
for _, v := range []string{"p1", "p2", "p3", "p4", "p5"} {
- var srm []string
- _, srm, err = cr.EvalSymlinks(realTemp+"/"+v, []string{})
+ info, err := os.Lstat(realTemp + "/" + v)
+ _, err = cr.UploadOutputFile(realTemp+"/"+v, info, err, []string{}, nil, "", "")
c.Assert(err, NotNil)
- for _, r := range srm {
- symlinksToRemove[r] = true
- }
}
- c.Assert(len(symlinksToRemove), Equals, 5)
-
- c.Assert(map[string]bool{realTemp + "/" + "p1": true,
- realTemp + "/" + "p2": true,
- realTemp + "/" + "p3": true,
- realTemp + "/" + "p4": true,
- realTemp + "/" + "p5": true}, DeepEquals, symlinksToRemove)
}
diff --git a/services/crunch-run/upload.go b/services/crunch-run/upload.go
index 3127683..95925e5 100644
--- a/services/crunch-run/upload.go
+++ b/services/crunch-run/upload.go
@@ -18,15 +18,14 @@ import (
"crypto/md5"
"errors"
"fmt"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "git.curoverse.com/arvados.git/sdk/go/manifest"
"io"
"log"
"os"
"path/filepath"
"strings"
"sync"
-
- "git.curoverse.com/arvados.git/sdk/go/keepclient"
- "git.curoverse.com/arvados.git/sdk/go/manifest"
)
// Block is a data block in a manifest stream
@@ -263,48 +262,27 @@ type WalkUpload struct {
mtx sync.Mutex
}
-// WalkFunc walks a directory tree, uploads each file found and adds it to the
-// CollectionWriter.
-func (m *WalkUpload) WalkFunc(path string, info os.FileInfo, err error) error {
- if err != nil {
- return err
- }
-
- targetPath, targetInfo := path, info
- if info.Mode()&os.ModeSymlink != 0 {
- // Update targetpath/info to reflect the symlink
- // target, not the symlink itself
- targetPath, err = filepath.EvalSymlinks(path)
- if err != nil {
- return err
- }
- targetInfo, err = os.Stat(targetPath)
- if err != nil {
- return fmt.Errorf("stat symlink %q target %q: %s", path, targetPath, err)
- }
- if targetInfo.IsDir() {
- // Symlinks to directories don't get walked, so do it
- // here. We've previously checked that they stay in
- // the output directory and don't result in an endless
- // loop.
- filepath.Walk(path+"/.", m.WalkFunc)
- }
- }
-
- if targetInfo.Mode()&os.ModeType != 0 {
- // Skip directories, pipes, other non-regular files
- return nil
- }
-
+func (m *WalkUpload) UploadFile(path string, sourcePath string) error {
var dir string
- if len(path) > (len(m.stripPrefix) + len(info.Name()) + 1) {
- dir = path[len(m.stripPrefix)+1 : (len(path) - len(info.Name()) - 1)]
+ basename := filepath.Base(path)
+ if len(path) > (len(m.stripPrefix) + len(basename) + 1) {
+ dir = path[len(m.stripPrefix)+1 : (len(path) - len(basename) - 1)]
}
if dir == "" {
dir = "."
}
- fn := path[(len(path) - len(info.Name())):]
+ fn := path[(len(path) - len(basename)):]
+
+ info, err := os.Stat(sourcePath)
+ if err != nil {
+ return err
+ }
+ file, err := os.Open(sourcePath)
+ if err != nil {
+ return err
+ }
+ defer file.Close()
if m.streamMap[dir] == nil {
m.streamMap[dir] = &CollectionFileWriter{
@@ -334,16 +312,11 @@ func (m *WalkUpload) WalkFunc(path string, info os.FileInfo, err error) error {
// Reset the CollectionFileWriter for a new file
fileWriter.NewFile(fn)
- file, err := os.Open(path)
- if err != nil {
- return err
- }
- defer file.Close()
-
m.status.Printf("Uploading %v/%v (%v bytes)", dir, fn, info.Size())
_, err = io.Copy(fileWriter, file)
if err != nil {
+ m.status.Printf("Uh oh")
return err
}
@@ -353,20 +326,15 @@ func (m *WalkUpload) WalkFunc(path string, info os.FileInfo, err error) error {
return nil
}
-func (cw *CollectionWriter) WriteTree(root string, status *log.Logger) (manifest string, err error) {
+func (cw *CollectionWriter) BeginUpload(root string, status *log.Logger) *WalkUpload {
streamMap := make(map[string]*CollectionFileWriter)
- wu := &WalkUpload{0, cw.IKeepClient, root, streamMap, status, nil, sync.Mutex{}}
- err = filepath.Walk(root, wu.WalkFunc)
-
- if err != nil {
- return "", err
- }
+ return &WalkUpload{0, cw.IKeepClient, root, streamMap, status, nil, sync.Mutex{}}
+}
+func (cw *CollectionWriter) EndUpload(wu *WalkUpload) {
cw.mtx.Lock()
- for _, st := range streamMap {
+ for _, st := range wu.streamMap {
cw.Streams = append(cw.Streams, st)
}
cw.mtx.Unlock()
-
- return cw.ManifestText()
}
diff --git a/services/crunch-run/upload_test.go b/services/crunch-run/upload_test.go
index 96ea2b1..86dab41 100644
--- a/services/crunch-run/upload_test.go
+++ b/services/crunch-run/upload_test.go
@@ -5,13 +5,13 @@
package main
import (
+ . "gopkg.in/check.v1"
"io/ioutil"
"log"
"os"
+ "path/filepath"
"sync"
"syscall"
-
- . "gopkg.in/check.v1"
)
type UploadTestSuite struct{}
@@ -19,6 +19,25 @@ type UploadTestSuite struct{}
// Gocheck boilerplate
var _ = Suite(&UploadTestSuite{})
+func writeTree(cw *CollectionWriter, root string, status *log.Logger) (mt string, err error) {
+ walkUpload := cw.BeginUpload(root, status)
+
+ err = filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
+ info, _ = os.Stat(path)
+ if info.Mode().IsRegular() {
+ return walkUpload.UploadFile(path, path)
+ }
+ return nil
+ })
+
+ cw.EndUpload(walkUpload)
+ if err != nil {
+ return "", err
+ }
+ mt, err = cw.ManifestText()
+ return
+}
+
func (s *TestSuite) TestSimpleUpload(c *C) {
tmpdir, _ := ioutil.TempDir("", "")
defer func() {
@@ -28,12 +47,12 @@ func (s *TestSuite) TestSimpleUpload(c *C) {
ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
cw := CollectionWriter{0, &KeepTestClient{}, nil, nil, sync.Mutex{}}
- str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
+ str, err := writeTree(&cw, tmpdir, log.New(os.Stdout, "", 0))
c.Check(err, IsNil)
c.Check(str, Equals, ". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt\n")
}
-func (s *TestSuite) TestSimpleUploadThreefiles(c *C) {
+func (s *TestSuite) TestUploadThreeFiles(c *C) {
tmpdir, _ := ioutil.TempDir("", "")
defer func() {
os.RemoveAll(tmpdir)
@@ -49,7 +68,7 @@ func (s *TestSuite) TestSimpleUploadThreefiles(c *C) {
}
cw := CollectionWriter{0, &KeepTestClient{}, nil, nil, sync.Mutex{}}
- str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
+ str, err := writeTree(&cw, tmpdir, log.New(os.Stdout, "", 0))
c.Check(err, IsNil)
c.Check(str, Equals, ". aa65a413921163458c52fea478d5d3ee+9 0:3:file1.txt 3:3:file2.txt 6:3:file3.txt\n")
@@ -67,7 +86,7 @@ func (s *TestSuite) TestSimpleUploadSubdir(c *C) {
ioutil.WriteFile(tmpdir+"/subdir/file2.txt", []byte("bar"), 0600)
cw := CollectionWriter{0, &KeepTestClient{}, nil, nil, sync.Mutex{}}
- str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
+ str, err := writeTree(&cw, tmpdir, log.New(os.Stdout, "", 0))
c.Check(err, IsNil)
@@ -101,7 +120,7 @@ func (s *TestSuite) TestSimpleUploadLarge(c *C) {
ioutil.WriteFile(tmpdir+"/"+"file2.txt", []byte("bar"), 0600)
cw := CollectionWriter{0, &KeepTestClient{}, nil, nil, sync.Mutex{}}
- str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
+ str, err := writeTree(&cw, tmpdir, log.New(os.Stdout, "", 0))
c.Check(err, IsNil)
c.Check(str, Equals, ". 00ecf01e0d93385115c9f8bed757425d+67108864 485cd630387b6b1846fe429f261ea05f+1048514 0:68157375:file1.txt 68157375:3:file2.txt\n")
@@ -118,7 +137,7 @@ func (s *TestSuite) TestUploadEmptySubdir(c *C) {
ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
cw := CollectionWriter{0, &KeepTestClient{}, nil, nil, sync.Mutex{}}
- str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
+ str, err := writeTree(&cw, tmpdir, log.New(os.Stdout, "", 0))
c.Check(err, IsNil)
c.Check(str, Equals, `. acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt
@@ -134,7 +153,7 @@ func (s *TestSuite) TestUploadEmptyFile(c *C) {
ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte(""), 0600)
cw := CollectionWriter{0, &KeepTestClient{}, nil, nil, sync.Mutex{}}
- str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
+ str, err := writeTree(&cw, tmpdir, log.New(os.Stdout, "", 0))
c.Check(err, IsNil)
c.Check(str, Equals, `. d41d8cd98f00b204e9800998ecf8427e+0 0:0:file1.txt
@@ -150,7 +169,7 @@ func (s *TestSuite) TestUploadError(c *C) {
ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
cw := CollectionWriter{0, &KeepErrorTestClient{}, nil, nil, sync.Mutex{}}
- str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
+ str, err := writeTree(&cw, tmpdir, log.New(os.Stdout, "", 0))
c.Check(err, NotNil)
c.Check(str, Equals, "")
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list