[ARVADOS] created: 1.1.3-336-g826bc43
Git user
git at public.curoverse.com
Thu Apr 5 15:51:31 EDT 2018
at 826bc4316d3afb0013b9951c37137323d0e00b08 (commit)
commit 826bc4316d3afb0013b9951c37137323d0e00b08
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Thu Apr 5 02:31:10 2018 -0400
13100: Use collection FS for output and logs.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/services/crunch-run/copier.go b/services/crunch-run/copier.go
new file mode 100644
index 0000000..273ec79
--- /dev/null
+++ b/services/crunch-run/copier.go
@@ -0,0 +1,337 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+ "encoding/json"
+ "errors"
+ "fmt"
+ "io"
+ "os"
+ "path/filepath"
+ "sort"
+ "strings"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/manifest"
+)
+
+type printfer interface {
+ Printf(string, ...interface{})
+}
+
+var errTooManySymlinks = errors.New("too many symlinks, or symlink cycle")
+
+const limitFollowSymlinks = 10
+
+type filetodo struct {
+ src string
+ dst string
+ size int64
+}
+
+// copier copies data from a finished container's output path to a new
+// Arvados collection.
+//
+// Regular files (and symlinks to regular files) in hostOutputDir are
+// copied from the local filesystem.
+//
+// Symlinks to mounted collections, and any collections mounted under
+// ctrOutputDir, are copied by transforming the relevant parts of the
+// existing manifests, without moving any data around.
+//
+// Symlinks to other parts of the container's filesystem result in
+// errors.
+//
+// To use a copier: first call walkMount() to inspect the output
+// directory, grab the necessary parts of already-stored collections,
+// and prepare a list of files that need to be copied from the local
+// filesystem; then call commit() to copy the file data and return a
+// complete output manifest.
+type copier struct {
+ client *arvados.Client
+ arvClient IArvadosClient
+ keepClient IKeepClient
+ hostOutputDir string
+ ctrOutputDir string
+ mounts map[string]arvados.Mount
+ secretMounts map[string]arvados.Mount
+ logger printfer
+
+ dirs []string
+ files []filetodo
+ manifest string
+
+ manifestCache map[string]*manifest.Manifest
+}
+
+// Copy copies data as needed, and returns a new manifest.
+func (cp *copier) Copy() (string, error) {
+ err := cp.walkMount("", cp.ctrOutputDir, limitFollowSymlinks, true)
+ if err != nil {
+ return "", err
+ }
+ fs, err := (&arvados.Collection{ManifestText: cp.manifest}).FileSystem(cp.client, cp.keepClient)
+ if err != nil {
+ return "", err
+ }
+ for _, d := range cp.dirs {
+ err = fs.Mkdir(d, 0777)
+ if err != nil {
+ return "", err
+ }
+ }
+ for _, f := range cp.files {
+ err = cp.copyFile(fs, f)
+ if err != nil {
+ return "", err
+ }
+ }
+ return fs.MarshalManifest(".")
+}
+
+func (cp *copier) copyFile(fs arvados.CollectionFileSystem, f filetodo) error {
+ cp.logger.Printf("copying %q (%d bytes)", f.dst, f.size)
+ dst, err := fs.OpenFile(f.dst, os.O_CREATE|os.O_WRONLY, 0666)
+ if err != nil {
+ return err
+ }
+ src, err := os.Open(f.src)
+ if err != nil {
+ dst.Close()
+ return err
+ }
+ defer src.Close()
+ _, err = io.Copy(dst, src)
+ if err != nil {
+ dst.Close()
+ return err
+ }
+ return dst.Close()
+}
+
+// Append to cp.manifest, cp.files, and cp.dirs so as to copy src (an
+// absolute path in the container's filesystem) to dest (an absolute
+// path in the output collection, or "" for output root).
+//
+// src must be (or be a descendant of) a readonly "collection" mount,
+// a writable collection mounted at ctrOutputPath, or a "tmp" mount.
+//
+// If walkMountsBelow is true, include contents of any collection
+// mounted below src as well.
+func (cp *copier) walkMount(dest, src string, maxSymlinks int, walkMountsBelow bool) error {
+ // srcRoot, srcMount indicate the innermost mount that
+ // contains src.
+ var srcRoot string
+ var srcMount arvados.Mount
+ for root, mnt := range cp.mounts {
+ if len(root) > len(srcRoot) && strings.HasPrefix(src+"/", root+"/") {
+ srcRoot, srcMount = root, mnt
+ }
+ }
+ for root := range cp.secretMounts {
+ if len(root) > len(srcRoot) && strings.HasPrefix(src+"/", root+"/") {
+ // Silently omit secrets, and symlinks to
+ // secrets.
+ return nil
+ }
+ }
+ if srcRoot == "" {
+ return fmt.Errorf("cannot output file %q: not in any mount", src)
+ }
+
+ // srcRelPath is the path to the file/dir we are trying to
+ // copy, relative to its mount point -- ".", "./foo.txt", ...
+ srcRelPath := filepath.Join(".", srcMount.Path, src[len(srcRoot):])
+
+ switch {
+ case srcMount.ExcludeFromOutput:
+ case srcMount.Kind == "tmp":
+ // Handle by walking the host filesystem.
+ return cp.walkHostFS(dest, src, maxSymlinks, walkMountsBelow)
+ case srcMount.Kind != "collection":
+ return fmt.Errorf("%q: unsupported mount %q in output (kind is %q)", src, srcRoot, srcMount.Kind)
+ case !srcMount.Writable:
+ mft, err := cp.getManifest(srcMount.PortableDataHash)
+ if err != nil {
+ return err
+ }
+ cp.manifest += mft.Extract(srcRelPath, dest).Text
+ case srcRoot == cp.ctrOutputDir:
+ f, err := os.Open(filepath.Join(cp.hostOutputDir, ".arvados#collection"))
+ if err != nil {
+ return err
+ }
+ defer f.Close()
+ var coll arvados.Collection
+ err = json.NewDecoder(f).Decode(&coll)
+ if err != nil {
+ return err
+ }
+ mft := manifest.Manifest{Text: coll.ManifestText}
+ cp.manifest += mft.Extract(srcRelPath, dest).Text
+ default:
+ return fmt.Errorf("cannot output %q as %q: writable collection mounted at %q", src, dest, srcRoot)
+ }
+ if walkMountsBelow {
+ return cp.walkMountsBelow(dest, src)
+ } else {
+ return nil
+ }
+}
+
+func (cp *copier) walkMountsBelow(dest, src string) error {
+ for mnt, mntinfo := range cp.mounts {
+ if !strings.HasPrefix(mnt, src+"/") {
+ continue
+ }
+ if mntinfo.Kind == "text" || mntinfo.Kind == "json" {
+ // These got copied into the nearest parent
+ // mount as regular files during setup, so
+ // they get copied as regular files when we
+ // process the parent. Output will reflect any
+ // changes and deletions done by the
+ // container.
+ continue
+ }
+ // Example: we are processing dest=/foo src=/mnt1/dir1
+ // (perhaps we followed a symlink /outdir/foo ->
+ // /mnt1/dir1). Caller has already processed the
+ // collection mounted at /mnt1, but now we find that
+ // /mnt1/dir1/mnt2 is also a mount, so we need to copy
+ // src=/mnt1/dir1/mnt2 to dest=/foo/mnt2.
+ //
+ // We handle all descendants of /mnt1/dir1 in this
+ // loop instead of using recursion:
+ // /mnt1/dir1/mnt2/mnt3 is a child of both /mnt1 and
+ // /mnt1/dir1/mnt2, but we only want to walk it
+ // once. (This simplification is safe because mounted
+ // collections cannot contain symlinks.)
+ err := cp.walkMount(dest+mnt[len(src):], mnt, 0, false)
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+// Add entries to cp.dirs and cp.files so as to copy src (an absolute
+// path in the container's filesystem which corresponds to a real file
+// or directory in cp.hostOutputDir) to dest (an absolute path in the
+// output collection, or "" for output root).
+//
+// Always follow symlinks.
+//
+// If includeMounts is true, include mounts at and below src.
+// Otherwise, skip them.
+func (cp *copier) walkHostFS(dest, src string, maxSymlinks int, includeMounts bool) error {
+ if includeMounts {
+ err := cp.walkMountsBelow(dest, src)
+ if err != nil {
+ return err
+ }
+ }
+
+ hostsrc := cp.hostOutputDir + src[len(cp.ctrOutputDir):]
+
+ // If src is a symlink, walk its target.
+ fi, err := os.Lstat(hostsrc)
+ if err != nil {
+ return fmt.Errorf("lstat %q: %s", src, err)
+ }
+ if fi.Mode()&os.ModeSymlink != 0 {
+ if maxSymlinks < 0 {
+ return errTooManySymlinks
+ }
+ target, err := os.Readlink(hostsrc)
+ if err != nil {
+ return fmt.Errorf("readlink %q: %s", src, err)
+ }
+ if !strings.HasPrefix(target, "/") {
+ target = filepath.Join(filepath.Dir(src), target)
+ }
+ return cp.walkMount(dest, target, maxSymlinks-1, true)
+ }
+
+ // If src is a regular directory, append it to cp.dirs and
+ // walk each of its children. (If there are no children,
+ // create an empty file "dest/.keep".)
+ if fi.Mode().IsDir() {
+ if dest != "" {
+ cp.dirs = append(cp.dirs, dest)
+ }
+ dir, err := os.Open(hostsrc)
+ if err != nil {
+ return fmt.Errorf("open %q: %s", src, err)
+ }
+ names, err := dir.Readdirnames(-1)
+ dir.Close()
+ if err != nil {
+ return fmt.Errorf("readdirnames %q: %s", src, err)
+ }
+ if len(names) == 0 {
+ if dest != "" {
+ cp.files = append(cp.files, filetodo{
+ src: os.DevNull,
+ dst: dest + "/.keep",
+ })
+ }
+ return nil
+ }
+ sort.Strings(names)
+ for _, name := range names {
+ dest, src := dest+"/"+name, src+"/"+name
+ if _, isSecret := cp.secretMounts[src]; isSecret {
+ continue
+ }
+ if mntinfo, isMount := cp.mounts[src]; isMount && mntinfo.Kind != "text" && mntinfo.Kind != "json" {
+ // If a regular file/dir somehow
+ // exists at a path that's also a
+ // mount target, ignore the file --
+ // the mount has already been included
+ // with walkMountsBelow().
+ //
+ // (...except json/text mounts, which
+ // are handled as regular files.)
+ continue
+ }
+ err = cp.walkHostFS(dest, src, maxSymlinks, false)
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+ }
+
+ // If src is a regular file, append it to cp.files.
+ if fi.Mode().IsRegular() {
+ cp.files = append(cp.files, filetodo{
+ src: hostsrc,
+ dst: dest,
+ size: fi.Size(),
+ })
+ return nil
+ }
+
+ return fmt.Errorf("Unsupported file type (mode %o) in output dir: %q", fi.Mode(), src)
+}
+
+func (cp *copier) getManifest(pdh string) (*manifest.Manifest, error) {
+ if mft, ok := cp.manifestCache[pdh]; ok {
+ return mft, nil
+ }
+ var coll arvados.Collection
+ err := cp.arvClient.Get("collections", pdh, nil, &coll)
+ if err != nil {
+ return nil, fmt.Errorf("error retrieving collection record for %q: %s", pdh, err)
+ }
+ mft := &manifest.Manifest{Text: coll.ManifestText}
+ if cp.manifestCache == nil {
+ cp.manifestCache = map[string]*manifest.Manifest{pdh: mft}
+ } else {
+ cp.manifestCache[pdh] = mft
+ }
+ return mft, nil
+}
diff --git a/services/crunch-run/copier_test.go b/services/crunch-run/copier_test.go
new file mode 100644
index 0000000..7b13ef9
--- /dev/null
+++ b/services/crunch-run/copier_test.go
@@ -0,0 +1,191 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+ "io"
+ "io/ioutil"
+ "os"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&copierSuite{})
+
+type copierSuite struct {
+ cp copier
+}
+
+func (s *copierSuite) SetUpTest(c *check.C) {
+ tmpdir, err := ioutil.TempDir("", "crunch-run.test.")
+ c.Assert(err, check.IsNil)
+ api, err := arvadosclient.MakeArvadosClient()
+ c.Assert(err, check.IsNil)
+ s.cp = copier{
+ client: arvados.NewClientFromEnv(),
+ arvClient: api,
+ hostOutputDir: tmpdir,
+ ctrOutputDir: "/ctr/outdir",
+ mounts: map[string]arvados.Mount{
+ "/ctr/outdir": {Kind: "tmp"},
+ },
+ secretMounts: map[string]arvados.Mount{
+ "/secret_text": {Kind: "text", Content: "xyzzy"},
+ },
+ }
+}
+
+func (s *copierSuite) TearDownTest(c *check.C) {
+ os.RemoveAll(s.cp.hostOutputDir)
+}
+
+func (s *copierSuite) TestEmptyOutput(c *check.C) {
+ err := s.cp.walkMount("", s.cp.ctrOutputDir, 10, true)
+ c.Check(err, check.IsNil)
+ c.Check(s.cp.dirs, check.DeepEquals, []string(nil))
+ c.Check(len(s.cp.files), check.Equals, 0)
+}
+
+func (s *copierSuite) TestRegularFilesAndDirs(c *check.C) {
+ err := os.MkdirAll(s.cp.hostOutputDir+"/dir1/dir2/dir3", 0755)
+ c.Assert(err, check.IsNil)
+ f, err := os.OpenFile(s.cp.hostOutputDir+"/dir1/foo", os.O_CREATE|os.O_WRONLY, 0644)
+ c.Assert(err, check.IsNil)
+ _, err = io.WriteString(f, "foo")
+ c.Assert(err, check.IsNil)
+ c.Assert(f.Close(), check.IsNil)
+
+ err = s.cp.walkMount("", s.cp.ctrOutputDir, 10, true)
+ c.Check(err, check.IsNil)
+ c.Check(s.cp.dirs, check.DeepEquals, []string{"/dir1", "/dir1/dir2", "/dir1/dir2/dir3"})
+ c.Check(s.cp.files, check.DeepEquals, []filetodo{
+ {src: os.DevNull, dst: "/dir1/dir2/dir3/.keep"},
+ {src: s.cp.hostOutputDir + "/dir1/foo", dst: "/dir1/foo", size: 3},
+ })
+}
+
+func (s *copierSuite) TestSymlinkCycle(c *check.C) {
+ c.Assert(os.Mkdir(s.cp.hostOutputDir+"/dir1", 0755), check.IsNil)
+ c.Assert(os.Mkdir(s.cp.hostOutputDir+"/dir2", 0755), check.IsNil)
+ c.Assert(os.Symlink("../dir2", s.cp.hostOutputDir+"/dir1/l_dir2"), check.IsNil)
+ c.Assert(os.Symlink("../dir1", s.cp.hostOutputDir+"/dir2/l_dir1"), check.IsNil)
+ err := s.cp.walkMount("", s.cp.ctrOutputDir, 10, true)
+ c.Check(err, check.ErrorMatches, `.*cycle.*`)
+}
+
+func (s *copierSuite) TestSymlinkTargetMissing(c *check.C) {
+ c.Assert(os.Symlink("./missing", s.cp.hostOutputDir+"/symlink"), check.IsNil)
+ err := s.cp.walkMount("", s.cp.ctrOutputDir, 10, true)
+ c.Check(err, check.ErrorMatches, `.*/ctr/outdir/missing.*`)
+}
+
+func (s *copierSuite) TestSymlinkTargetNotMounted(c *check.C) {
+ c.Assert(os.Symlink("../boop", s.cp.hostOutputDir+"/symlink"), check.IsNil)
+ err := s.cp.walkMount("", s.cp.ctrOutputDir, 10, true)
+ c.Check(err, check.ErrorMatches, `.*/ctr/boop.*`)
+}
+
+func (s *copierSuite) TestSymlinkToSecret(c *check.C) {
+ c.Assert(os.Symlink("/secret_text", s.cp.hostOutputDir+"/symlink"), check.IsNil)
+ err := s.cp.walkMount("", s.cp.ctrOutputDir, 10, true)
+ c.Check(err, check.IsNil)
+ c.Check(len(s.cp.dirs), check.Equals, 0)
+ c.Check(len(s.cp.files), check.Equals, 0)
+}
+
+func (s *copierSuite) TestSecretInOutputDir(c *check.C) {
+ s.cp.secretMounts["/ctr/outdir/secret_text"] = s.cp.secretMounts["/secret_text"]
+ s.writeFileInOutputDir(c, "secret_text", "xyzzy")
+ err := s.cp.walkMount("", s.cp.ctrOutputDir, 10, true)
+ c.Check(err, check.IsNil)
+ c.Check(len(s.cp.dirs), check.Equals, 0)
+ c.Check(len(s.cp.files), check.Equals, 0)
+}
+
+func (s *copierSuite) TestSymlinkToMountedCollection(c *check.C) {
+ s.cp.mounts["/mnt"] = arvados.Mount{
+ Kind: "collection",
+ PortableDataHash: arvadostest.FooPdh,
+ }
+ c.Assert(os.Symlink("../../mnt", s.cp.hostOutputDir+"/l_dir"), check.IsNil)
+ c.Assert(os.Symlink("/mnt/foo", s.cp.hostOutputDir+"/l_file"), check.IsNil)
+
+ err := s.cp.walkMount("", s.cp.ctrOutputDir, 10, true)
+ c.Check(err, check.IsNil)
+ c.Check(s.cp.manifest, check.Matches, `(?ms)\./l_dir acbd\S+ 0:3:foo\n\. acbd\S+ 0:3:l_file\n`)
+}
+
+func (s *copierSuite) TestSymlink(c *check.C) {
+ hostfile := s.cp.hostOutputDir + "/dir1/file"
+
+ err := os.MkdirAll(s.cp.hostOutputDir+"/dir1/dir2/dir3", 0755)
+ c.Assert(err, check.IsNil)
+ s.writeFileInOutputDir(c, "dir1/file", "file")
+ for _, err := range []error{
+ os.Symlink(s.cp.ctrOutputDir+"/dir1/file", s.cp.hostOutputDir+"/l_abs_file"),
+ os.Symlink(s.cp.ctrOutputDir+"/dir1/dir2", s.cp.hostOutputDir+"/l_abs_dir2"),
+ os.Symlink("../../dir1/file", s.cp.hostOutputDir+"/dir1/dir2/l_rel_file"),
+ os.Symlink("dir1/file", s.cp.hostOutputDir+"/l_rel_file"),
+ os.MkdirAll(s.cp.hostOutputDir+"/morelinks", 0755),
+ os.Symlink("../dir1/dir2", s.cp.hostOutputDir+"/morelinks/l_rel_dir2"),
+ os.Symlink("dir1/dir2/dir3", s.cp.hostOutputDir+"/l_rel_dir3"),
+ } {
+ c.Assert(err, check.IsNil)
+ }
+
+ err = s.cp.walkMount("", s.cp.ctrOutputDir, 10, true)
+ c.Check(err, check.IsNil)
+ c.Check(s.cp.dirs, check.DeepEquals, []string{
+ "/dir1", "/dir1/dir2", "/dir1/dir2/dir3",
+ "/l_abs_dir2", "/l_abs_dir2/dir3",
+ "/l_rel_dir3",
+ "/morelinks", "/morelinks/l_rel_dir2", "/morelinks/l_rel_dir2/dir3",
+ })
+ c.Check(s.cp.files, check.DeepEquals, []filetodo{
+ {dst: "/dir1/dir2/dir3/.keep", src: os.DevNull},
+ {dst: "/dir1/dir2/l_rel_file", src: hostfile, size: 4},
+ {dst: "/dir1/file", src: hostfile, size: 4},
+ {dst: "/l_abs_dir2/dir3/.keep", src: os.DevNull},
+ {dst: "/l_abs_dir2/l_rel_file", src: hostfile, size: 4},
+ {dst: "/l_abs_file", src: hostfile, size: 4},
+ {dst: "/l_rel_dir3/.keep", src: os.DevNull},
+ {dst: "/l_rel_file", src: hostfile, size: 4},
+ {dst: "/morelinks/l_rel_dir2/dir3/.keep", src: os.DevNull},
+ {dst: "/morelinks/l_rel_dir2/l_rel_file", src: hostfile, size: 4},
+ })
+}
+
+func (s *copierSuite) TestUnsupportedOutputMount(c *check.C) {
+ s.cp.mounts["/ctr/outdir"] = arvados.Mount{Kind: "waz"}
+ err := s.cp.walkMount("", s.cp.ctrOutputDir, 10, true)
+ c.Check(err, check.NotNil)
+}
+
+func (s *copierSuite) TestUnsupportedMountKindBelow(c *check.C) {
+ s.cp.mounts["/ctr/outdir/dirk"] = arvados.Mount{Kind: "waz"}
+ err := s.cp.walkMount("", s.cp.ctrOutputDir, 10, true)
+ c.Check(err, check.NotNil)
+}
+
+func (s *copierSuite) TestUnsupportedWritableMountBelow(c *check.C) {
+ s.cp.mounts["/ctr/outdir/dirk"] = arvados.Mount{
+ Kind: "collection",
+ PortableDataHash: arvadostest.FooPdh,
+ Writable: true,
+ }
+ err := s.cp.walkMount("", s.cp.ctrOutputDir, 10, true)
+ c.Check(err, check.NotNil)
+}
+
+func (s *copierSuite) writeFileInOutputDir(c *check.C, path, data string) {
+ f, err := os.OpenFile(s.cp.hostOutputDir+"/"+path, os.O_CREATE|os.O_WRONLY, 0644)
+ c.Assert(err, check.IsNil)
+ _, err = io.WriteString(f, data)
+ c.Assert(err, check.IsNil)
+ c.Assert(f.Close(), check.IsNil)
+}
diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index 53815cb..aea5917 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -57,13 +57,14 @@ var ErrCancelled = errors.New("Cancelled")
// IKeepClient is the minimal Keep API methods used by crunch-run.
type IKeepClient interface {
- PutHB(hash string, buf []byte) (string, int, error)
+ PutB(buf []byte) (string, int, error)
+ ReadAt(locator string, p []byte, off int) (int, error)
ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error)
ClearBlockCache()
}
// NewLogWriter is a factory function to create a new log writer.
-type NewLogWriter func(name string) io.WriteCloser
+type NewLogWriter func(name string) (io.WriteCloser, error)
type RunArvMount func(args []string, tok string) (*exec.Cmd, error)
@@ -86,6 +87,7 @@ type ThinDockerClient interface {
// container.
type ContainerRunner struct {
Docker ThinDockerClient
+ client *arvados.Client
ArvClient IArvadosClient
Kc IKeepClient
arvados.Container
@@ -99,7 +101,7 @@ type ContainerRunner struct {
CrunchLog *ThrottledLogger
Stdout io.WriteCloser
Stderr io.WriteCloser
- LogCollection *CollectionWriter
+ LogCollection arvados.CollectionFileSystem
LogsPDH *string
RunArvMount
MkTempDir
@@ -275,7 +277,11 @@ func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (
}
c.Env = append(c.Env, "ARVADOS_API_TOKEN="+token)
- runner.arvMountLog = NewThrottledLogger(runner.NewLogWriter("arv-mount"))
+ w, err := runner.NewLogWriter("arv-mount")
+ if err != nil {
+ return nil, err
+ }
+ runner.arvMountLog = NewThrottledLogger(w)
c.Stdout = runner.arvMountLog
c.Stderr = runner.arvMountLog
@@ -696,18 +702,27 @@ func (runner *ContainerRunner) stopHoststat() error {
return nil
}
-func (runner *ContainerRunner) startHoststat() {
- runner.hoststatLogger = NewThrottledLogger(runner.NewLogWriter("hoststat"))
+func (runner *ContainerRunner) startHoststat() error {
+ w, err := runner.NewLogWriter("hoststat")
+ if err != nil {
+ return err
+ }
+ runner.hoststatLogger = NewThrottledLogger(w)
runner.hoststatReporter = &crunchstat.Reporter{
Logger: log.New(runner.hoststatLogger, "", 0),
CgroupRoot: runner.cgroupRoot,
PollPeriod: runner.statInterval,
}
runner.hoststatReporter.Start()
+ return nil
}
-func (runner *ContainerRunner) startCrunchstat() {
- runner.statLogger = NewThrottledLogger(runner.NewLogWriter("crunchstat"))
+func (runner *ContainerRunner) startCrunchstat() error {
+ w, err := runner.NewLogWriter("crunchstat")
+ if err != nil {
+ return err
+ }
+ runner.statLogger = NewThrottledLogger(w)
runner.statReporter = &crunchstat.Reporter{
CID: runner.ContainerID,
Logger: log.New(runner.statLogger, "", 0),
@@ -716,6 +731,7 @@ func (runner *ContainerRunner) startCrunchstat() {
PollPeriod: runner.statInterval,
}
runner.statReporter.Start()
+ return nil
}
type infoCommand struct {
@@ -729,7 +745,10 @@ type infoCommand struct {
// might differ from what's described in the node record (see
// LogNodeRecord).
func (runner *ContainerRunner) LogHostInfo() (err error) {
- w := runner.NewLogWriter("node-info")
+ w, err := runner.NewLogWriter("node-info")
+ if err != nil {
+ return
+ }
commands := []infoCommand{
{
@@ -802,11 +821,15 @@ func (runner *ContainerRunner) LogNodeRecord() error {
}
func (runner *ContainerRunner) logAPIResponse(label, path string, params map[string]interface{}, munge func(interface{})) (logged bool, err error) {
+ writer, err := runner.LogCollection.OpenFile(label+".json", os.O_CREATE|os.O_WRONLY, 0666)
+ if err != nil {
+ return false, err
+ }
w := &ArvLogWriter{
ArvClient: runner.ArvClient,
UUID: runner.Container.UUID,
loggingStream: label,
- writeCloser: runner.LogCollection.Open(label + ".json"),
+ writeCloser: writer,
}
reader, err := runner.ArvClient.CallRaw("GET", path, "", "", arvadosclient.Dict(params))
@@ -893,8 +916,10 @@ func (runner *ContainerRunner) AttachStreams() (err error) {
return err
}
runner.Stdout = stdoutFile
+ } else if w, err := runner.NewLogWriter("stdout"); err != nil {
+ return err
} else {
- runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout"))
+ runner.Stdout = NewThrottledLogger(w)
}
if stderrMnt, ok := runner.Container.Mounts["stderr"]; ok {
@@ -903,8 +928,10 @@ func (runner *ContainerRunner) AttachStreams() (err error) {
return err
}
runner.Stderr = stderrFile
+ } else if w, err := runner.NewLogWriter("stderr"); err != nil {
+ return err
} else {
- runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
+ runner.Stderr = NewThrottledLogger(w)
}
if stdinRdr != nil {
@@ -1075,181 +1102,8 @@ func (runner *ContainerRunner) WaitFinish() error {
}
}
-var ErrNotInOutputDir = fmt.Errorf("Must point to path within the output directory")
-
-func (runner *ContainerRunner) derefOutputSymlink(path string, startinfo os.FileInfo) (tgt string, readlinktgt string, info os.FileInfo, err error) {
- // Follow symlinks if necessary
- info = startinfo
- tgt = path
- readlinktgt = ""
- nextlink := path
- for followed := 0; info.Mode()&os.ModeSymlink != 0; followed++ {
- if followed >= limitFollowSymlinks {
- // Got stuck in a loop or just a pathological number of links, give up.
- err = fmt.Errorf("Followed more than %v symlinks from path %q", limitFollowSymlinks, path)
- return
- }
-
- readlinktgt, err = os.Readlink(nextlink)
- if err != nil {
- return
- }
-
- tgt = readlinktgt
- if !strings.HasPrefix(tgt, "/") {
- // Relative symlink, resolve it to host path
- tgt = filepath.Join(filepath.Dir(path), tgt)
- }
- if strings.HasPrefix(tgt, runner.Container.OutputPath+"/") && !strings.HasPrefix(tgt, runner.HostOutputDir+"/") {
- // Absolute symlink to container output path, adjust it to host output path.
- tgt = filepath.Join(runner.HostOutputDir, tgt[len(runner.Container.OutputPath):])
- }
- if !strings.HasPrefix(tgt, runner.HostOutputDir+"/") {
- // After dereferencing, symlink target must either be
- // within output directory, or must point to a
- // collection mount.
- err = ErrNotInOutputDir
- return
- }
-
- info, err = os.Lstat(tgt)
- if err != nil {
- // tgt
- err = fmt.Errorf("Symlink in output %q points to invalid location %q: %v",
- path[len(runner.HostOutputDir):], readlinktgt, err)
- return
- }
-
- nextlink = tgt
- }
-
- return
-}
-
-var limitFollowSymlinks = 10
-
-// UploadFile uploads files within the output directory, with special handling
-// for symlinks. If the symlink leads to a keep mount, copy the manifest text
-// from the keep mount into the output manifestText. Ensure that whether
-// symlinks are relative or absolute, every symlink target (even targets that
-// are symlinks themselves) must point to a path in either the output directory
-// or a collection mount.
-//
-// Assumes initial value of "path" is absolute, and located within runner.HostOutputDir.
-func (runner *ContainerRunner) UploadOutputFile(
- path string,
- info os.FileInfo,
- infoerr error,
- binds []string,
- walkUpload *WalkUpload,
- relocateFrom string,
- relocateTo string,
- followed int) (manifestText string, err error) {
-
- if infoerr != nil {
- return "", infoerr
- }
-
- if info.Mode().IsDir() {
- // if empty, need to create a .keep file
- dir, direrr := os.Open(path)
- if direrr != nil {
- return "", direrr
- }
- defer dir.Close()
- names, eof := dir.Readdirnames(1)
- if len(names) == 0 && eof == io.EOF && path != runner.HostOutputDir {
- containerPath := runner.OutputPath + path[len(runner.HostOutputDir):]
- for _, bind := range binds {
- mnt := runner.Container.Mounts[bind]
- // Check if there is a bind for this
- // directory, in which case assume we don't need .keep
- if (containerPath == bind || strings.HasPrefix(containerPath, bind+"/")) && mnt.PortableDataHash != "d41d8cd98f00b204e9800998ecf8427e+0" {
- return
- }
- }
- outputSuffix := path[len(runner.HostOutputDir)+1:]
- return fmt.Sprintf("./%v d41d8cd98f00b204e9800998ecf8427e+0 0:0:.keep\n", outputSuffix), nil
- }
- return
- }
-
- if followed >= limitFollowSymlinks {
- // Got stuck in a loop or just a pathological number of
- // directory links, give up.
- err = fmt.Errorf("Followed more than %v symlinks from path %q", limitFollowSymlinks, path)
- return
- }
-
- // "path" is the actual path we are visiting
- // "tgt" is the target of "path" (a non-symlink) after following symlinks
- // "relocated" is the path in the output manifest where the file should be placed,
- // but has HostOutputDir as a prefix.
-
- // The destination path in the output manifest may need to be
- // logically relocated to some other path in order to appear
- // in the correct location as a result of following a symlink.
- // Remove the relocateFrom prefix and replace it with
- // relocateTo.
- relocated := relocateTo + path[len(relocateFrom):]
-
- tgt, readlinktgt, info, derefErr := runner.derefOutputSymlink(path, info)
- if derefErr != nil && derefErr != ErrNotInOutputDir {
- return "", derefErr
- }
-
- // go through mounts and try reverse map to collection reference
- for _, bind := range binds {
- mnt := runner.Container.Mounts[bind]
- if (tgt == bind || strings.HasPrefix(tgt, bind+"/")) && !mnt.Writable {
- // get path relative to bind
- targetSuffix := tgt[len(bind):]
-
- // Copy mount and adjust the path to add path relative to the bind
- adjustedMount := mnt
- adjustedMount.Path = filepath.Join(adjustedMount.Path, targetSuffix)
-
- // Terminates in this keep mount, so add the
- // manifest text at appropriate location.
- outputSuffix := relocated[len(runner.HostOutputDir):]
- manifestText, err = runner.getCollectionManifestForPath(adjustedMount, outputSuffix)
- return
- }
- }
-
- // If target is not a collection mount, it must be located within the
- // output directory, otherwise it is an error.
- if derefErr == ErrNotInOutputDir {
- err = fmt.Errorf("Symlink in output %q points to invalid location %q, must point to path within the output directory.",
- path[len(runner.HostOutputDir):], readlinktgt)
- return
- }
-
- if info.Mode().IsRegular() {
- return "", walkUpload.UploadFile(relocated, tgt)
- }
-
- if info.Mode().IsDir() {
- // Symlink leads to directory. Walk() doesn't follow
- // directory symlinks, so we walk the target directory
- // instead. Within the walk, file paths are relocated
- // so they appear under the original symlink path.
- err = filepath.Walk(tgt, func(walkpath string, walkinfo os.FileInfo, walkerr error) error {
- var m string
- m, walkerr = runner.UploadOutputFile(walkpath, walkinfo, walkerr,
- binds, walkUpload, tgt, relocated, followed+1)
- if walkerr == nil {
- manifestText = manifestText + m
- }
- return walkerr
- })
- return
- }
-
- return
-}
-
-// HandleOutput sets the output, unmounts the FUSE mount, and deletes temporary directories
+// CaptureOutput saves data from the container's output directory if
+// needed, and updates the container output accordingly.
func (runner *ContainerRunner) CaptureOutput() error {
if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
// Output may have been set directly by the container, so
@@ -1266,163 +1120,35 @@ func (runner *ContainerRunner) CaptureOutput() error {
}
}
- if runner.HostOutputDir == "" {
- return nil
- }
-
- _, err := os.Stat(runner.HostOutputDir)
+ txt, err := (&copier{
+ client: runner.client,
+ arvClient: runner.ArvClient,
+ keepClient: runner.Kc,
+ hostOutputDir: runner.HostOutputDir,
+ ctrOutputDir: runner.Container.OutputPath,
+ mounts: runner.Container.Mounts,
+ secretMounts: runner.SecretMounts,
+ logger: runner.CrunchLog,
+ }).Copy()
if err != nil {
- return fmt.Errorf("While checking host output path: %v", err)
- }
-
- // Pre-populate output from the configured mount points
- var binds []string
- for bind, mnt := range runner.Container.Mounts {
- if mnt.Kind == "collection" {
- binds = append(binds, bind)
- }
- }
- sort.Strings(binds)
-
- // Delete secret mounts so they don't get saved to the output collection.
- for bind := range runner.SecretMounts {
- if strings.HasPrefix(bind, runner.Container.OutputPath+"/") {
- err = os.Remove(runner.HostOutputDir + bind[len(runner.Container.OutputPath):])
- if err != nil {
- return fmt.Errorf("Unable to remove secret mount: %v", err)
- }
- }
- }
-
- var manifestText string
-
- collectionMetafile := fmt.Sprintf("%s/.arvados#collection", runner.HostOutputDir)
- _, err = os.Stat(collectionMetafile)
- if err != nil {
- // Regular directory
-
- cw := CollectionWriter{0, runner.Kc, nil, nil, sync.Mutex{}}
- walkUpload := cw.BeginUpload(runner.HostOutputDir, runner.CrunchLog.Logger)
-
- var m string
- err = filepath.Walk(runner.HostOutputDir, func(path string, info os.FileInfo, err error) error {
- m, err = runner.UploadOutputFile(path, info, err, binds, walkUpload, "", "", 0)
- if err == nil {
- manifestText = manifestText + m
- }
- return err
- })
-
- cw.EndUpload(walkUpload)
-
- if err != nil {
- return fmt.Errorf("While uploading output files: %v", err)
- }
-
- m, err = cw.ManifestText()
- manifestText = manifestText + m
- if err != nil {
- return fmt.Errorf("While uploading output files: %v", err)
- }
- } else {
- // FUSE mount directory
- file, openerr := os.Open(collectionMetafile)
- if openerr != nil {
- return fmt.Errorf("While opening FUSE metafile: %v", err)
- }
- defer file.Close()
-
- var rec arvados.Collection
- err = json.NewDecoder(file).Decode(&rec)
- if err != nil {
- return fmt.Errorf("While reading FUSE metafile: %v", err)
- }
- manifestText = rec.ManifestText
- }
-
- for _, bind := range binds {
- mnt := runner.Container.Mounts[bind]
-
- bindSuffix := strings.TrimPrefix(bind, runner.Container.OutputPath)
-
- if bindSuffix == bind || len(bindSuffix) <= 0 {
- // either does not start with OutputPath or is OutputPath itself
- continue
- }
-
- if mnt.ExcludeFromOutput == true || mnt.Writable {
- continue
- }
-
- // append to manifest_text
- m, err := runner.getCollectionManifestForPath(mnt, bindSuffix)
- if err != nil {
- return err
- }
-
- manifestText = manifestText + m
- }
-
- // Save output
- var response arvados.Collection
- manifest := manifest.Manifest{Text: manifestText}
- manifestText = manifest.Extract(".", ".").Text
- err = runner.ArvClient.Create("collections",
- arvadosclient.Dict{
- "ensure_unique_name": true,
- "collection": arvadosclient.Dict{
- "is_trashed": true,
- "name": "output for " + runner.Container.UUID,
- "manifest_text": manifestText}},
- &response)
+ return err
+ }
+ var resp arvados.Collection
+ err = runner.ArvClient.Create("collections", arvadosclient.Dict{
+ "ensure_unique_name": true,
+ "collection": arvadosclient.Dict{
+ "is_trashed": true,
+ "name": "output for " + runner.Container.UUID,
+ "manifest_text": txt,
+ },
+ }, &resp)
if err != nil {
- return fmt.Errorf("While creating output collection: %v", err)
+ return fmt.Errorf("error creating output collection: %v", err)
}
- runner.OutputPDH = &response.PortableDataHash
+ runner.OutputPDH = &resp.PortableDataHash
return nil
}
-var outputCollections = make(map[string]arvados.Collection)
-
-// Fetch the collection for the mnt.PortableDataHash
-// Return the manifest_text fragment corresponding to the specified mnt.Path
-// after making any required updates.
-// Ex:
-// If mnt.Path is not specified,
-// return the entire manifest_text after replacing any "." with bindSuffix
-// If mnt.Path corresponds to one stream,
-// return the manifest_text for that stream after replacing that stream name with bindSuffix
-// Otherwise, check if a filename in any one stream is being sought. Return the manifest_text
-// for that stream after replacing stream name with bindSuffix minus the last word
-// and the file name with last word of the bindSuffix
-// Allowed path examples:
-// "path":"/"
-// "path":"/subdir1"
-// "path":"/subdir1/subdir2"
-// "path":"/subdir/filename" etc
-func (runner *ContainerRunner) getCollectionManifestForPath(mnt arvados.Mount, bindSuffix string) (string, error) {
- collection := outputCollections[mnt.PortableDataHash]
- if collection.PortableDataHash == "" {
- err := runner.ArvClient.Get("collections", mnt.PortableDataHash, nil, &collection)
- if err != nil {
- return "", fmt.Errorf("While getting collection for %v: %v", mnt.PortableDataHash, err)
- }
- outputCollections[mnt.PortableDataHash] = collection
- }
-
- if collection.ManifestText == "" {
- runner.CrunchLog.Printf("No manifest text for collection %v", collection.PortableDataHash)
- return "", nil
- }
-
- mft := manifest.Manifest{Text: collection.ManifestText}
- extracted := mft.Extract(mnt.Path, bindSuffix)
- if extracted.Err != nil {
- return "", fmt.Errorf("Error parsing manifest for %v: %v", mnt.PortableDataHash, extracted.Err.Error())
- }
- return extracted.Text, nil
-}
-
func (runner *ContainerRunner) CleanupDirs() {
if runner.ArvMount != nil {
var delay int64 = 8
@@ -1495,8 +1221,12 @@ func (runner *ContainerRunner) CommitLogs() error {
// point, but re-open crunch log with ArvClient in case there are any
// other further errors (such as failing to write the log to Keep!)
// while shutting down
- runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{ArvClient: runner.ArvClient,
- UUID: runner.Container.UUID, loggingStream: "crunch-run", writeCloser: nil})
+ runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{
+ ArvClient: runner.ArvClient,
+ UUID: runner.Container.UUID,
+ loggingStream: "crunch-run",
+ writeCloser: nil,
+ })
runner.CrunchLog.Immediate = log.New(os.Stderr, runner.Container.UUID+" ", 0)
}()
@@ -1509,7 +1239,7 @@ func (runner *ContainerRunner) CommitLogs() error {
return nil
}
- mt, err := runner.LogCollection.ManifestText()
+ mt, err := runner.LogCollection.MarshalManifest(".")
if err != nil {
return fmt.Errorf("While creating log manifest: %v", err)
}
@@ -1584,12 +1314,17 @@ func (runner *ContainerRunner) IsCancelled() bool {
}
// NewArvLogWriter creates an ArvLogWriter
-func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
+func (runner *ContainerRunner) NewArvLogWriter(name string) (io.WriteCloser, error) {
+ writer, err := runner.LogCollection.OpenFile(name+".txt", os.O_CREATE|os.O_WRONLY, 0666)
+ if err != nil {
+ return nil, err
+ }
return &ArvLogWriter{
ArvClient: runner.ArvClient,
UUID: runner.Container.UUID,
loggingStream: name,
- writeCloser: runner.LogCollection.Open(name + ".txt")}
+ writeCloser: writer,
+ }, nil
}
// Run the full container lifecycle.
@@ -1658,7 +1393,10 @@ func (runner *ContainerRunner) Run() (err error) {
return
}
runner.setupSignals()
- runner.startHoststat()
+ err = runner.startHoststat()
+ if err != nil {
+ return
+ }
// check for and/or load image
err = runner.LoadImage()
@@ -1707,7 +1445,10 @@ func (runner *ContainerRunner) Run() (err error) {
}
runner.finalState = "Cancelled"
- runner.startCrunchstat()
+ err = runner.startCrunchstat()
+ if err != nil {
+ return
+ }
err = runner.StartContainer()
if err != nil {
@@ -1766,12 +1507,13 @@ func (runner *ContainerRunner) fetchContainerRecord() error {
}
// NewContainerRunner creates a new container runner.
-func NewContainerRunner(api IArvadosClient,
- kc IKeepClient,
- docker ThinDockerClient,
- containerUUID string) *ContainerRunner {
-
- cr := &ContainerRunner{ArvClient: api, Kc: kc, Docker: docker}
+func NewContainerRunner(client *arvados.Client, api IArvadosClient, kc IKeepClient, docker ThinDockerClient, containerUUID string) (*ContainerRunner, error) {
+ cr := &ContainerRunner{
+ client: client,
+ ArvClient: api,
+ Kc: kc,
+ Docker: docker,
+ }
cr.NewLogWriter = cr.NewArvLogWriter
cr.RunArvMount = cr.ArvMountCmd
cr.MkTempDir = ioutil.TempDir
@@ -1783,14 +1525,22 @@ func NewContainerRunner(api IArvadosClient,
cl.ApiToken = token
return cl, nil
}
- cr.LogCollection = &CollectionWriter{0, kc, nil, nil, sync.Mutex{}}
+ var err error
+ cr.LogCollection, err = (&arvados.Collection{}).FileSystem(cr.client, cr.Kc)
+ if err != nil {
+ return nil, err
+ }
cr.Container.UUID = containerUUID
- cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run"))
+ w, err := cr.NewLogWriter("crunch-run")
+ if err != nil {
+ return nil, err
+ }
+ cr.CrunchLog = NewThrottledLogger(w)
cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
loadLogThrottleParams(api)
- return cr
+ return cr, nil
}
func main() {
@@ -1842,7 +1592,10 @@ func main() {
// minimum version we want to support.
docker, dockererr := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
- cr := NewContainerRunner(api, kc, docker, containerId)
+ cr, err := NewContainerRunner(arvados.NewClientFromEnv(), api, kc, docker, containerId)
+ if err != nil {
+ log.Fatal(err)
+ }
if dockererr != nil {
cr.CrunchLog.Printf("%s: %v", containerId, dockererr)
cr.checkBrokenNode(dockererr)
@@ -1872,15 +1625,15 @@ func main() {
if *memprofile != "" {
f, err := os.Create(*memprofile)
if err != nil {
- log.Printf("could not create memory profile: ", err)
+ log.Printf("could not create memory profile: %s", err)
}
runtime.GC() // get up-to-date statistics
if err := pprof.WriteHeapProfile(f); err != nil {
- log.Printf("could not write memory profile: ", err)
+ log.Printf("could not write memory profile: %s", err)
}
closeerr := f.Close()
if closeerr != nil {
- log.Printf("closing memprofile file: ", err)
+ log.Printf("closing memprofile file: %s", err)
}
}
diff --git a/services/crunch-run/crunchrun_test.go b/services/crunch-run/crunchrun_test.go
index ba91959..8ee4625 100644
--- a/services/crunch-run/crunchrun_test.go
+++ b/services/crunch-run/crunchrun_test.go
@@ -13,7 +13,6 @@ import (
"fmt"
"io"
"io/ioutil"
- "log"
"net"
"os"
"os/exec"
@@ -46,10 +45,12 @@ func TestCrunchExec(t *testing.T) {
var _ = Suite(&TestSuite{})
type TestSuite struct {
+ client *arvados.Client
docker *TestDockerClient
}
func (s *TestSuite) SetUpTest(c *C) {
+ s.client = arvados.NewClientFromEnv()
s.docker = NewTestDockerClient()
}
@@ -356,12 +357,16 @@ call:
return nil
}
-func (client *KeepTestClient) PutHB(hash string, buf []byte) (string, int, error) {
+func (client *KeepTestClient) PutB(buf []byte) (string, int, error) {
client.Content = buf
- return fmt.Sprintf("%s+%d", hash, len(buf)), len(buf), nil
+ return fmt.Sprintf("%x+%d", md5.Sum(buf), len(buf)), len(buf), nil
}
-func (*KeepTestClient) ClearBlockCache() {
+func (client *KeepTestClient) ReadAt(string, []byte, int) (int, error) {
+ return 0, errors.New("not implemented")
+}
+
+func (client *KeepTestClient) ClearBlockCache() {
}
func (client *KeepTestClient) Close() {
@@ -413,9 +418,10 @@ func (client *KeepTestClient) ManifestFileReader(m manifest.Manifest, filename s
func (s *TestSuite) TestLoadImage(c *C) {
kc := &KeepTestClient{}
defer kc.Close()
- cr := NewContainerRunner(&ArvTestClient{}, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ cr, err := NewContainerRunner(s.client, &ArvTestClient{}, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ c.Assert(err, IsNil)
- _, err := cr.Docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
+ _, err = cr.Docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
c.Check(err, IsNil)
_, _, err = cr.Docker.ImageInspectWithRaw(nil, hwImageId)
@@ -480,26 +486,24 @@ func (ArvErrorTestClient) Discovery(key string) (interface{}, error) {
return discoveryMap[key], nil
}
-type KeepErrorTestClient struct{}
-
-func (KeepErrorTestClient) PutHB(hash string, buf []byte) (string, int, error) {
- return "", 0, errors.New("KeepError")
+type KeepErrorTestClient struct {
+ KeepTestClient
}
-func (KeepErrorTestClient) ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error) {
+func (*KeepErrorTestClient) ManifestFileReader(manifest.Manifest, string) (arvados.File, error) {
return nil, errors.New("KeepError")
}
-func (KeepErrorTestClient) ClearBlockCache() {
+func (*KeepErrorTestClient) PutB(buf []byte) (string, int, error) {
+ return "", 0, errors.New("KeepError")
}
-type KeepReadErrorTestClient struct{}
-
-func (KeepReadErrorTestClient) PutHB(hash string, buf []byte) (string, int, error) {
- return "", 0, nil
+type KeepReadErrorTestClient struct {
+ KeepTestClient
}
-func (KeepReadErrorTestClient) ClearBlockCache() {
+func (*KeepReadErrorTestClient) ReadAt(string, []byte, int) (int, error) {
+ return 0, errors.New("KeepError")
}
type ErrorReader struct {
@@ -522,37 +526,42 @@ func (s *TestSuite) TestLoadImageArvError(c *C) {
// (1) Arvados error
kc := &KeepTestClient{}
defer kc.Close()
- cr := NewContainerRunner(ArvErrorTestClient{}, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ cr, err := NewContainerRunner(s.client, ArvErrorTestClient{}, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ c.Assert(err, IsNil)
cr.Container.ContainerImage = hwPDH
- err := cr.LoadImage()
+ err = cr.LoadImage()
c.Check(err.Error(), Equals, "While getting container image collection: ArvError")
}
func (s *TestSuite) TestLoadImageKeepError(c *C) {
// (2) Keep error
- cr := NewContainerRunner(&ArvTestClient{}, KeepErrorTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ cr, err := NewContainerRunner(s.client, &ArvTestClient{}, &KeepErrorTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ c.Assert(err, IsNil)
cr.Container.ContainerImage = hwPDH
- err := cr.LoadImage()
+ err = cr.LoadImage()
+ c.Assert(err, NotNil)
c.Check(err.Error(), Equals, "While creating ManifestFileReader for container image: KeepError")
}
func (s *TestSuite) TestLoadImageCollectionError(c *C) {
// (3) Collection doesn't contain image
- cr := NewContainerRunner(&ArvTestClient{}, KeepErrorTestClient{}, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ cr, err := NewContainerRunner(s.client, &ArvTestClient{}, &KeepReadErrorTestClient{}, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ c.Assert(err, IsNil)
cr.Container.ContainerImage = otherPDH
- err := cr.LoadImage()
+ err = cr.LoadImage()
c.Check(err.Error(), Equals, "First file in the container image collection does not end in .tar")
}
func (s *TestSuite) TestLoadImageKeepReadError(c *C) {
// (4) Collection doesn't contain image
- cr := NewContainerRunner(&ArvTestClient{}, KeepReadErrorTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ cr, err := NewContainerRunner(s.client, &ArvTestClient{}, &KeepReadErrorTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ c.Assert(err, IsNil)
cr.Container.ContainerImage = hwPDH
- err := cr.LoadImage()
+ err = cr.LoadImage()
c.Check(err, NotNil)
}
@@ -569,14 +578,14 @@ type TestLogs struct {
Stderr ClosableBuffer
}
-func (tl *TestLogs) NewTestLoggingWriter(logstr string) io.WriteCloser {
+func (tl *TestLogs) NewTestLoggingWriter(logstr string) (io.WriteCloser, error) {
if logstr == "stdout" {
- return &tl.Stdout
+ return &tl.Stdout, nil
}
if logstr == "stderr" {
- return &tl.Stderr
+ return &tl.Stderr, nil
}
- return nil
+ return nil, errors.New("???")
}
func dockerLog(fd byte, msg string) []byte {
@@ -595,13 +604,14 @@ func (s *TestSuite) TestRunContainer(c *C) {
}
kc := &KeepTestClient{}
defer kc.Close()
- cr := NewContainerRunner(&ArvTestClient{}, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ cr, err := NewContainerRunner(s.client, &ArvTestClient{}, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ c.Assert(err, IsNil)
var logs TestLogs
cr.NewLogWriter = logs.NewTestLoggingWriter
cr.Container.ContainerImage = hwPDH
cr.Container.Command = []string{"./hw"}
- err := cr.LoadImage()
+ err = cr.LoadImage()
c.Check(err, IsNil)
err = cr.CreateContainer()
@@ -621,14 +631,15 @@ func (s *TestSuite) TestCommitLogs(c *C) {
api := &ArvTestClient{}
kc := &KeepTestClient{}
defer kc.Close()
- cr := NewContainerRunner(api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ cr, err := NewContainerRunner(s.client, api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ c.Assert(err, IsNil)
cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp
cr.CrunchLog.Print("Hello world!")
cr.CrunchLog.Print("Goodbye")
cr.finalState = "Complete"
- err := cr.CommitLogs()
+ err = cr.CommitLogs()
c.Check(err, IsNil)
c.Check(api.Calls, Equals, 2)
@@ -642,9 +653,10 @@ func (s *TestSuite) TestUpdateContainerRunning(c *C) {
api := &ArvTestClient{}
kc := &KeepTestClient{}
defer kc.Close()
- cr := NewContainerRunner(api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ cr, err := NewContainerRunner(s.client, api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ c.Assert(err, IsNil)
- err := cr.UpdateContainerRunning()
+ err = cr.UpdateContainerRunning()
c.Check(err, IsNil)
c.Check(api.Content[0]["container"].(arvadosclient.Dict)["state"], Equals, "Running")
@@ -654,7 +666,8 @@ func (s *TestSuite) TestUpdateContainerComplete(c *C) {
api := &ArvTestClient{}
kc := &KeepTestClient{}
defer kc.Close()
- cr := NewContainerRunner(api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ cr, err := NewContainerRunner(s.client, api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ c.Assert(err, IsNil)
cr.LogsPDH = new(string)
*cr.LogsPDH = "d3a229d2fe3690c2c3e75a71a153c6a3+60"
@@ -663,7 +676,7 @@ func (s *TestSuite) TestUpdateContainerComplete(c *C) {
*cr.ExitCode = 42
cr.finalState = "Complete"
- err := cr.UpdateContainerFinal()
+ err = cr.UpdateContainerFinal()
c.Check(err, IsNil)
c.Check(api.Content[0]["container"].(arvadosclient.Dict)["log"], Equals, *cr.LogsPDH)
@@ -675,11 +688,12 @@ func (s *TestSuite) TestUpdateContainerCancelled(c *C) {
api := &ArvTestClient{}
kc := &KeepTestClient{}
defer kc.Close()
- cr := NewContainerRunner(api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ cr, err := NewContainerRunner(s.client, api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ c.Assert(err, IsNil)
cr.cCancelled = true
cr.finalState = "Cancelled"
- err := cr.UpdateContainerFinal()
+ err = cr.UpdateContainerFinal()
c.Check(err, IsNil)
c.Check(api.Content[0]["container"].(arvadosclient.Dict)["log"], IsNil)
@@ -700,7 +714,7 @@ func (s *TestSuite) fullRunHelper(c *C, record string, extraMounts []string, exi
err = json.Unmarshal([]byte(record), &sm)
c.Check(err, IsNil)
secretMounts, err := json.Marshal(sm)
- log.Printf("%q %q", sm, secretMounts)
+ c.Logf("%s %q", sm, secretMounts)
c.Check(err, IsNil)
s.docker.exitCode = exitCode
@@ -711,7 +725,8 @@ func (s *TestSuite) fullRunHelper(c *C, record string, extraMounts []string, exi
s.docker.api = api
kc := &KeepTestClient{}
defer kc.Close()
- cr = NewContainerRunner(api, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ cr, err = NewContainerRunner(s.client, api, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ c.Assert(err, IsNil)
cr.statInterval = 100 * time.Millisecond
am := &ArvMountCmdLine{}
cr.RunArvMount = am.ArvMountTest
@@ -989,7 +1004,8 @@ func (s *TestSuite) testStopContainer(c *C, setup func(cr *ContainerRunner)) {
api := &ArvTestClient{Container: rec}
kc := &KeepTestClient{}
defer kc.Close()
- cr := NewContainerRunner(api, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ cr, err := NewContainerRunner(s.client, api, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ c.Assert(err, IsNil)
cr.RunArvMount = func([]string, string) (*exec.Cmd, error) { return nil, nil }
cr.MkArvClient = func(token string) (IArvadosClient, error) {
return &ArvTestClient{}, nil
@@ -1060,7 +1076,8 @@ func (s *TestSuite) TestSetupMounts(c *C) {
api := &ArvTestClient{}
kc := &KeepTestClient{}
defer kc.Close()
- cr := NewContainerRunner(api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ cr, err := NewContainerRunner(s.client, api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ c.Assert(err, IsNil)
am := &ArvMountCmdLine{}
cr.RunArvMount = am.ArvMountTest
@@ -1470,7 +1487,8 @@ func (s *TestSuite) stdoutErrorRunHelper(c *C, record string, fn func(t *TestDoc
api = &ArvTestClient{Container: rec}
kc := &KeepTestClient{}
defer kc.Close()
- cr = NewContainerRunner(api, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ cr, err = NewContainerRunner(s.client, api, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ c.Assert(err, IsNil)
am := &ArvMountCmdLine{}
cr.RunArvMount = am.ArvMountTest
cr.MkArvClient = func(token string) (IArvadosClient, error) {
@@ -1512,8 +1530,8 @@ func (s *TestSuite) TestStdoutWithWrongKindCollection(c *C) {
}
func (s *TestSuite) TestFullRunWithAPI(c *C) {
+ defer os.Setenv("ARVADOS_API_HOST", os.Getenv("ARVADOS_API_HOST"))
os.Setenv("ARVADOS_API_HOST", "test.arvados.org")
- defer os.Unsetenv("ARVADOS_API_HOST")
api, _, _ := s.fullRunHelper(c, `{
"command": ["/bin/sh", "-c", "echo $ARVADOS_API_HOST"],
"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
@@ -1535,8 +1553,8 @@ func (s *TestSuite) TestFullRunWithAPI(c *C) {
}
func (s *TestSuite) TestFullRunSetOutput(c *C) {
+ defer os.Setenv("ARVADOS_API_HOST", os.Getenv("ARVADOS_API_HOST"))
os.Setenv("ARVADOS_API_HOST", "test.arvados.org")
- defer os.Unsetenv("ARVADOS_API_HOST")
api, _, _ := s.fullRunHelper(c, `{
"command": ["/bin/sh", "-c", "echo $ARVADOS_API_HOST"],
"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
@@ -1634,7 +1652,7 @@ func (s *TestSuite) TestStdoutWithMultipleMountPointsUnderOutputDir(c *C) {
manifest := collection["manifest_text"].(string)
c.Check(manifest, Equals, `./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out
-./foo 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396c73c0abcdefgh11234567890 at 569fa8c3 9:18:bar 9:18:sub1file2
+./foo 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396c73c0abcdefgh11234567890 at 569fa8c3 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396cabcdefghij6419876543234 at 569fa8c4 9:18:bar 36:18:sub1file2
./foo/baz 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396c73c0bcdefghijk544332211 at 569fa8c5 9:18:sub2file2
./foo/sub1 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396cabcdefghij6419876543234 at 569fa8c4 0:9:file1_in_subdir1.txt 9:18:file2_in_subdir1.txt
./foo/sub1/subdir2 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396c73c0bcdefghijk544332211 at 569fa8c5 0:9:file1_in_subdir2.txt 9:18:file2_in_subdir2.txt
@@ -1652,7 +1670,7 @@ func (s *TestSuite) TestStdoutWithMountPointsUnderOutputDirDenormalizedManifest(
"environment": {"FROBIZ": "bilbo"},
"mounts": {
"/tmp": {"kind": "tmp"},
- "/tmp/foo/bar": {"kind": "collection", "portable_data_hash": "b0def87f80dd594d4675809e83bd4f15+367/subdir1/file2_in_subdir1.txt"},
+ "/tmp/foo/bar": {"kind": "collection", "portable_data_hash": "b0def87f80dd594d4675809e83bd4f15+367", "path": "/subdir1/file2_in_subdir1.txt"},
"stdout": {"kind": "file", "path": "/tmp/a/b/c.out"}
},
"output_path": "/tmp",
@@ -1685,52 +1703,6 @@ func (s *TestSuite) TestStdoutWithMountPointsUnderOutputDirDenormalizedManifest(
}
}
-func (s *TestSuite) TestOutputSymlinkToInput(c *C) {
- helperRecord := `{
- "command": ["/bin/sh", "-c", "echo $FROBIZ"],
- "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
- "cwd": "/bin",
- "environment": {"FROBIZ": "bilbo"},
- "mounts": {
- "/tmp": {"kind": "tmp"},
- "/keep/foo/sub1file2": {"kind": "collection", "portable_data_hash": "a0def87f80dd594d4675809e83bd4f15+367", "path": "/subdir1/file2_in_subdir1.txt"},
- "/keep/foo2": {"kind": "collection", "portable_data_hash": "a0def87f80dd594d4675809e83bd4f15+367"}
- },
- "output_path": "/tmp",
- "priority": 1,
- "runtime_constraints": {}
- }`
-
- extraMounts := []string{
- "a0def87f80dd594d4675809e83bd4f15+367/subdir1/file2_in_subdir1.txt",
- }
-
- api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
- os.Symlink("/keep/foo/sub1file2", t.realTemp+"/tmp2/baz")
- os.Symlink("/keep/foo2/subdir1/file2_in_subdir1.txt", t.realTemp+"/tmp2/baz2")
- os.Symlink("/keep/foo2/subdir1", t.realTemp+"/tmp2/baz3")
- os.Mkdir(t.realTemp+"/tmp2/baz4", 0700)
- os.Symlink("/keep/foo2/subdir1/file2_in_subdir1.txt", t.realTemp+"/tmp2/baz4/baz5")
- t.logWriter.Close()
- })
-
- c.Check(api.CalledWith("container.exit_code", 0), NotNil)
- c.Check(api.CalledWith("container.state", "Complete"), NotNil)
- for _, v := range api.Content {
- if v["collection"] != nil {
- collection := v["collection"].(arvadosclient.Dict)
- if strings.Index(collection["name"].(string), "output") == 0 {
- manifest := collection["manifest_text"].(string)
- c.Check(manifest, Equals, `. 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396cabcdefghij6419876543234 at 569fa8c4 9:18:baz 9:18:baz2
-./baz3 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396cabcdefghij6419876543234 at 569fa8c4 0:9:file1_in_subdir1.txt 9:18:file2_in_subdir1.txt
-./baz3/subdir2 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396c73c0bcdefghijk544332211 at 569fa8c5 0:9:file1_in_subdir2.txt 9:18:file2_in_subdir2.txt
-./baz4 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396cabcdefghij6419876543234 at 569fa8c4 9:18:baz5
-`)
- }
- }
- }
-}
-
func (s *TestSuite) TestOutputError(c *C) {
helperRecord := `{
"command": ["/bin/sh", "-c", "echo $FROBIZ"],
@@ -1755,59 +1727,6 @@ func (s *TestSuite) TestOutputError(c *C) {
c.Check(api.CalledWith("container.state", "Cancelled"), NotNil)
}
-func (s *TestSuite) TestOutputSymlinkToOutput(c *C) {
- helperRecord := `{
- "command": ["/bin/sh", "-c", "echo $FROBIZ"],
- "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
- "cwd": "/bin",
- "environment": {"FROBIZ": "bilbo"},
- "mounts": {
- "/tmp": {"kind": "tmp"}
- },
- "output_path": "/tmp",
- "priority": 1,
- "runtime_constraints": {}
- }`
-
- extraMounts := []string{}
-
- api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
- rf, _ := os.Create(t.realTemp + "/tmp2/realfile")
- rf.Write([]byte("foo"))
- rf.Close()
-
- os.Mkdir(t.realTemp+"/tmp2/realdir", 0700)
- rf, _ = os.Create(t.realTemp + "/tmp2/realdir/subfile")
- rf.Write([]byte("bar"))
- rf.Close()
-
- os.Symlink("/tmp/realfile", t.realTemp+"/tmp2/file1")
- os.Symlink("realfile", t.realTemp+"/tmp2/file2")
- os.Symlink("/tmp/file1", t.realTemp+"/tmp2/file3")
- os.Symlink("file2", t.realTemp+"/tmp2/file4")
- os.Symlink("realdir", t.realTemp+"/tmp2/dir1")
- os.Symlink("/tmp/realdir", t.realTemp+"/tmp2/dir2")
- t.logWriter.Close()
- })
-
- c.Check(api.CalledWith("container.exit_code", 0), NotNil)
- c.Check(api.CalledWith("container.state", "Complete"), NotNil)
- for _, v := range api.Content {
- if v["collection"] != nil {
- collection := v["collection"].(arvadosclient.Dict)
- if strings.Index(collection["name"].(string), "output") == 0 {
- manifest := collection["manifest_text"].(string)
- c.Check(manifest, Equals,
- `. 7a2c86e102dcc231bd232aad99686dfa+15 0:3:file1 3:3:file2 6:3:file3 9:3:file4 12:3:realfile
-./dir1 37b51d194a7513e45b56f6524f2d51f2+3 0:3:subfile
-./dir2 37b51d194a7513e45b56f6524f2d51f2+3 0:3:subfile
-./realdir 37b51d194a7513e45b56f6524f2d51f2+3 0:3:subfile
-`)
- }
- }
- }
-}
-
func (s *TestSuite) TestStdinCollectionMountPoint(c *C) {
helperRecord := `{
"command": ["/bin/sh", "-c", "echo $FROBIZ"],
@@ -1911,7 +1830,8 @@ func (s *TestSuite) TestStderrMount(c *C) {
func (s *TestSuite) TestNumberRoundTrip(c *C) {
kc := &KeepTestClient{}
defer kc.Close()
- cr := NewContainerRunner(&ArvTestClient{callraw: true}, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ cr, err := NewContainerRunner(s.client, &ArvTestClient{callraw: true}, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ c.Assert(err, IsNil)
cr.fetchContainerRecord()
jsondata, err := json.Marshal(cr.Container.Mounts["/json"].Content)
@@ -1920,59 +1840,6 @@ func (s *TestSuite) TestNumberRoundTrip(c *C) {
c.Check(string(jsondata), Equals, `{"number":123456789123456789}`)
}
-func (s *TestSuite) TestEvalSymlinks(c *C) {
- kc := &KeepTestClient{}
- defer kc.Close()
- cr := NewContainerRunner(&ArvTestClient{callraw: true}, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
-
- realTemp, err := ioutil.TempDir("", "crunchrun_test-")
- c.Assert(err, IsNil)
- defer os.RemoveAll(realTemp)
-
- cr.HostOutputDir = realTemp
-
- // Absolute path outside output dir
- os.Symlink("/etc/passwd", realTemp+"/p1")
-
- // Relative outside output dir
- os.Symlink("../zip", realTemp+"/p2")
-
- // Circular references
- os.Symlink("p4", realTemp+"/p3")
- os.Symlink("p5", realTemp+"/p4")
- os.Symlink("p3", realTemp+"/p5")
-
- // Target doesn't exist
- os.Symlink("p99", realTemp+"/p6")
-
- for _, v := range []string{"p1", "p2", "p3", "p4", "p5"} {
- info, err := os.Lstat(realTemp + "/" + v)
- c.Assert(err, IsNil)
- _, _, _, err = cr.derefOutputSymlink(realTemp+"/"+v, info)
- c.Assert(err, NotNil)
- }
-}
-
-func (s *TestSuite) TestEvalSymlinkDir(c *C) {
- kc := &KeepTestClient{}
- defer kc.Close()
- cr := NewContainerRunner(&ArvTestClient{callraw: true}, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
-
- realTemp, err := ioutil.TempDir("", "crunchrun_test-")
- c.Assert(err, IsNil)
- defer os.RemoveAll(realTemp)
-
- cr.HostOutputDir = realTemp
-
- // Absolute path outside output dir
- os.Symlink(".", realTemp+"/loop")
-
- v := "loop"
- info, err := os.Lstat(realTemp + "/" + v)
- _, err = cr.UploadOutputFile(realTemp+"/"+v, info, err, []string{}, nil, "", "", 0)
- c.Assert(err, NotNil)
-}
-
func (s *TestSuite) TestFullBrokenDocker1(c *C) {
tf, err := ioutil.TempFile("", "brokenNodeHook-")
c.Assert(err, IsNil)
diff --git a/services/crunch-run/logging_test.go b/services/crunch-run/logging_test.go
index abac2bb..86f8cec 100644
--- a/services/crunch-run/logging_test.go
+++ b/services/crunch-run/logging_test.go
@@ -10,11 +10,14 @@ import (
"testing"
"time"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
. "gopkg.in/check.v1"
)
-type LoggingTestSuite struct{}
+type LoggingTestSuite struct {
+ client *arvados.Client
+}
type TestTimestamper struct {
count int
@@ -32,11 +35,16 @@ func (this *TestTimestamper) Timestamp(t time.Time) string {
// Gocheck boilerplate
var _ = Suite(&LoggingTestSuite{})
+func (s *LoggingTestSuite) SetUpTest(c *C) {
+ s.client = arvados.NewClientFromEnv()
+}
+
func (s *LoggingTestSuite) TestWriteLogs(c *C) {
api := &ArvTestClient{}
kc := &KeepTestClient{}
defer kc.Close()
- cr := NewContainerRunner(api, kc, nil, "zzzzz-zzzzzzzzzzzzzzz")
+ cr, err := NewContainerRunner(s.client, api, kc, nil, "zzzzz-zzzzzzzzzzzzzzz")
+ c.Assert(err, IsNil)
cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp
cr.CrunchLog.Print("Hello world!")
@@ -45,7 +53,7 @@ func (s *LoggingTestSuite) TestWriteLogs(c *C) {
c.Check(api.Calls, Equals, 1)
- mt, err := cr.LogCollection.ManifestText()
+ mt, err := cr.LogCollection.MarshalManifest(".")
c.Check(err, IsNil)
c.Check(mt, Equals, ". 74561df9ae65ee9f35d5661d42454264+83 0:83:crunch-run.txt\n")
@@ -64,7 +72,8 @@ func (s *LoggingTestSuite) TestWriteLogsLarge(c *C) {
api := &ArvTestClient{}
kc := &KeepTestClient{}
defer kc.Close()
- cr := NewContainerRunner(api, kc, nil, "zzzzz-zzzzzzzzzzzzzzz")
+ cr, err := NewContainerRunner(s.client, api, kc, nil, "zzzzz-zzzzzzzzzzzzzzz")
+ c.Assert(err, IsNil)
cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp
cr.CrunchLog.Immediate = nil
@@ -77,7 +86,7 @@ func (s *LoggingTestSuite) TestWriteLogsLarge(c *C) {
c.Check(api.Calls > 1, Equals, true)
c.Check(api.Calls < 2000000, Equals, true)
- mt, err := cr.LogCollection.ManifestText()
+ mt, err := cr.LogCollection.MarshalManifest(".")
c.Check(err, IsNil)
c.Check(mt, Equals, ". 9c2c05d1fae6aaa8af85113ba725716d+67108864 80b821383a07266c2a66a4566835e26e+21780065 0:88888929:crunch-run.txt\n")
}
@@ -86,10 +95,13 @@ func (s *LoggingTestSuite) TestWriteMultipleLogs(c *C) {
api := &ArvTestClient{}
kc := &KeepTestClient{}
defer kc.Close()
- cr := NewContainerRunner(api, kc, nil, "zzzzz-zzzzzzzzzzzzzzz")
+ cr, err := NewContainerRunner(s.client, api, kc, nil, "zzzzz-zzzzzzzzzzzzzzz")
+ c.Assert(err, IsNil)
ts := &TestTimestamper{}
cr.CrunchLog.Timestamper = ts.Timestamp
- stdout := NewThrottledLogger(cr.NewLogWriter("stdout"))
+ w, err := cr.NewLogWriter("stdout")
+ c.Assert(err, IsNil)
+ stdout := NewThrottledLogger(w)
stdout.Timestamper = ts.Timestamp
cr.CrunchLog.Print("Hello world!")
@@ -112,26 +124,24 @@ func (s *LoggingTestSuite) TestWriteMultipleLogs(c *C) {
2015-12-29T15:51:45.000000004Z Blurb
`)
- mt, err := cr.LogCollection.ManifestText()
+ mt, err := cr.LogCollection.MarshalManifest(".")
c.Check(err, IsNil)
- c.Check(mt, Equals, ""+
- ". 408672f5b5325f7d20edfbf899faee42+83 0:83:crunch-run.txt\n"+
- ". c556a293010069fa79a6790a931531d5+80 0:80:stdout.txt\n")
+ c.Check(mt, Equals, ". 48f9023dc683a850b1c9b482b14c4b97+163 0:83:crunch-run.txt 83:80:stdout.txt\n")
}
func (s *LoggingTestSuite) TestWriteLogsWithRateLimitThrottleBytes(c *C) {
- testWriteLogsWithRateLimit(c, "crunchLogThrottleBytes", 50, 65536, "Exceeded rate 50 bytes per 60 seconds")
+ s.testWriteLogsWithRateLimit(c, "crunchLogThrottleBytes", 50, 65536, "Exceeded rate 50 bytes per 60 seconds")
}
func (s *LoggingTestSuite) TestWriteLogsWithRateLimitThrottleLines(c *C) {
- testWriteLogsWithRateLimit(c, "crunchLogThrottleLines", 1, 1024, "Exceeded rate 1 lines per 60 seconds")
+ s.testWriteLogsWithRateLimit(c, "crunchLogThrottleLines", 1, 1024, "Exceeded rate 1 lines per 60 seconds")
}
func (s *LoggingTestSuite) TestWriteLogsWithRateLimitThrottleBytesPerEvent(c *C) {
- testWriteLogsWithRateLimit(c, "crunchLimitLogBytesPerJob", 50, 67108864, "Exceeded log limit 50 bytes (crunch_limit_log_bytes_per_job)")
+ s.testWriteLogsWithRateLimit(c, "crunchLimitLogBytesPerJob", 50, 67108864, "Exceeded log limit 50 bytes (crunch_limit_log_bytes_per_job)")
}
-func testWriteLogsWithRateLimit(c *C, throttleParam string, throttleValue int, throttleDefault int, expected string) {
+func (s *LoggingTestSuite) testWriteLogsWithRateLimit(c *C, throttleParam string, throttleValue int, throttleDefault int, expected string) {
discoveryMap[throttleParam] = float64(throttleValue)
defer func() {
discoveryMap[throttleParam] = float64(throttleDefault)
@@ -140,7 +150,8 @@ func testWriteLogsWithRateLimit(c *C, throttleParam string, throttleValue int, t
api := &ArvTestClient{}
kc := &KeepTestClient{}
defer kc.Close()
- cr := NewContainerRunner(api, kc, nil, "zzzzz-zzzzzzzzzzzzzzz")
+ cr, err := NewContainerRunner(s.client, api, kc, nil, "zzzzz-zzzzzzzzzzzzzzz")
+ c.Assert(err, IsNil)
cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp
cr.CrunchLog.Print("Hello world!")
@@ -149,7 +160,7 @@ func testWriteLogsWithRateLimit(c *C, throttleParam string, throttleValue int, t
c.Check(api.Calls, Equals, 1)
- mt, err := cr.LogCollection.ManifestText()
+ mt, err := cr.LogCollection.MarshalManifest(".")
c.Check(err, IsNil)
c.Check(mt, Equals, ". 74561df9ae65ee9f35d5661d42454264+83 0:83:crunch-run.txt\n")
diff --git a/services/crunch-run/upload.go b/services/crunch-run/upload.go
deleted file mode 100644
index ddad8bf..0000000
--- a/services/crunch-run/upload.go
+++ /dev/null
@@ -1,342 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package main
-
-// Originally based on sdk/go/crunchrunner/upload.go
-//
-// Unlike the original, which iterates over a directory tree and uploads each
-// file sequentially, this version supports opening and writing multiple files
-// in a collection simultaneously.
-//
-// Eventually this should move into the Arvados Go SDK for a more comprehensive
-// implementation of Collections.
-
-import (
- "bytes"
- "crypto/md5"
- "errors"
- "fmt"
- "io"
- "log"
- "os"
- "path/filepath"
- "strings"
- "sync"
-
- "git.curoverse.com/arvados.git/sdk/go/keepclient"
- "git.curoverse.com/arvados.git/sdk/go/manifest"
-)
-
-// Block is a data block in a manifest stream
-type Block struct {
- data []byte
- offset int64
-}
-
-// CollectionFileWriter is a Writer that permits writing to a file in a Keep Collection.
-type CollectionFileWriter struct {
- IKeepClient
- *manifest.ManifestStream
- offset uint64
- length uint64
- *Block
- uploader chan *Block
- finish chan []error
- fn string
-}
-
-// Write to a file in a keep collection
-func (m *CollectionFileWriter) Write(p []byte) (int, error) {
- n, err := m.ReadFrom(bytes.NewReader(p))
- return int(n), err
-}
-
-// ReadFrom a Reader and write to the Keep collection file.
-func (m *CollectionFileWriter) ReadFrom(r io.Reader) (n int64, err error) {
- var total int64
- var count int
-
- for err == nil {
- if m.Block == nil {
- m.Block = &Block{make([]byte, keepclient.BLOCKSIZE), 0}
- }
- count, err = r.Read(m.Block.data[m.Block.offset:])
- total += int64(count)
- m.Block.offset += int64(count)
- if m.Block.offset == keepclient.BLOCKSIZE {
- m.uploader <- m.Block
- m.Block = nil
- }
- }
-
- m.length += uint64(total)
-
- if err == io.EOF {
- return total, nil
- }
- return total, err
-}
-
-// Close stops writing a file and adds it to the parent manifest.
-func (m *CollectionFileWriter) Close() error {
- m.ManifestStream.FileStreamSegments = append(m.ManifestStream.FileStreamSegments,
- manifest.FileStreamSegment{m.offset, m.length, m.fn})
- return nil
-}
-
-func (m *CollectionFileWriter) NewFile(fn string) {
- m.offset += m.length
- m.length = 0
- m.fn = fn
-}
-
-func (m *CollectionFileWriter) goUpload(workers chan struct{}) {
- var mtx sync.Mutex
- var wg sync.WaitGroup
-
- var errors []error
- uploader := m.uploader
- finish := m.finish
- for block := range uploader {
- mtx.Lock()
- m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, "")
- blockIndex := len(m.ManifestStream.Blocks) - 1
- mtx.Unlock()
-
- workers <- struct{}{} // wait for an available worker slot
- wg.Add(1)
-
- go func(block *Block, blockIndex int) {
- hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
- signedHash, _, err := m.IKeepClient.PutHB(hash, block.data[0:block.offset])
- <-workers
-
- mtx.Lock()
- if err != nil {
- errors = append(errors, err)
- } else {
- m.ManifestStream.Blocks[blockIndex] = signedHash
- }
- mtx.Unlock()
-
- wg.Done()
- }(block, blockIndex)
- }
- wg.Wait()
-
- finish <- errors
-}
-
-// CollectionWriter implements creating new Keep collections by opening files
-// and writing to them.
-type CollectionWriter struct {
- MaxWriters int
- IKeepClient
- Streams []*CollectionFileWriter
- workers chan struct{}
- mtx sync.Mutex
-}
-
-// Open a new file for writing in the Keep collection.
-func (m *CollectionWriter) Open(path string) io.WriteCloser {
- var dir string
- var fn string
-
- i := strings.Index(path, "/")
- if i > -1 {
- dir = "./" + path[0:i]
- fn = path[i+1:]
- } else {
- dir = "."
- fn = path
- }
-
- fw := &CollectionFileWriter{
- m.IKeepClient,
- &manifest.ManifestStream{StreamName: dir},
- 0,
- 0,
- nil,
- make(chan *Block),
- make(chan []error),
- fn}
-
- m.mtx.Lock()
- defer m.mtx.Unlock()
- if m.workers == nil {
- if m.MaxWriters < 1 {
- m.MaxWriters = 2
- }
- m.workers = make(chan struct{}, m.MaxWriters)
- }
-
- go fw.goUpload(m.workers)
-
- m.Streams = append(m.Streams, fw)
-
- return fw
-}
-
-// Finish writing the collection, wait for all blocks to complete uploading.
-func (m *CollectionWriter) Finish() error {
- var errstring string
- m.mtx.Lock()
- defer m.mtx.Unlock()
-
- for _, stream := range m.Streams {
- if stream.uploader == nil {
- continue
- }
- if stream.Block != nil {
- stream.uploader <- stream.Block
- stream.Block = nil
- }
- close(stream.uploader)
- stream.uploader = nil
-
- errors := <-stream.finish
- close(stream.finish)
- stream.finish = nil
-
- for _, r := range errors {
- errstring = fmt.Sprintf("%v%v\n", errstring, r.Error())
- }
- }
- if errstring != "" {
- return errors.New(errstring)
- }
- return nil
-}
-
-// ManifestText returns the manifest text of the collection. Calls Finish()
-// first to ensure that all blocks are written and that signed locators and
-// available.
-func (m *CollectionWriter) ManifestText() (mt string, err error) {
- err = m.Finish()
- if err != nil {
- return "", err
- }
-
- var buf bytes.Buffer
-
- m.mtx.Lock()
- defer m.mtx.Unlock()
- for _, v := range m.Streams {
- if len(v.FileStreamSegments) == 0 {
- continue
- }
- k := v.StreamName
- if k == "." {
- buf.WriteString(".")
- } else {
- k = strings.Replace(k, " ", "\\040", -1)
- k = strings.Replace(k, "\n", "", -1)
- buf.WriteString("./" + k)
- }
- if len(v.Blocks) > 0 {
- for _, b := range v.Blocks {
- buf.WriteString(" ")
- buf.WriteString(b)
- }
- } else {
- buf.WriteString(" d41d8cd98f00b204e9800998ecf8427e+0")
- }
- for _, f := range v.FileStreamSegments {
- buf.WriteString(" ")
- name := strings.Replace(f.Name, " ", "\\040", -1)
- name = strings.Replace(name, "\n", "", -1)
- buf.WriteString(fmt.Sprintf("%v:%v:%v", f.SegPos, f.SegLen, name))
- }
- buf.WriteString("\n")
- }
- return buf.String(), nil
-}
-
-type WalkUpload struct {
- MaxWriters int
- kc IKeepClient
- stripPrefix string
- streamMap map[string]*CollectionFileWriter
- status *log.Logger
- workers chan struct{}
- mtx sync.Mutex
-}
-
-func (m *WalkUpload) UploadFile(path string, sourcePath string) error {
- var dir string
- basename := filepath.Base(path)
- if len(path) > (len(m.stripPrefix) + len(basename) + 1) {
- dir = path[len(m.stripPrefix)+1 : (len(path) - len(basename) - 1)]
- }
- if dir == "" {
- dir = "."
- }
-
- fn := path[(len(path) - len(basename)):]
-
- info, err := os.Stat(sourcePath)
- if err != nil {
- return err
- }
- file, err := os.Open(sourcePath)
- if err != nil {
- return err
- }
- defer file.Close()
-
- if m.streamMap[dir] == nil {
- m.streamMap[dir] = &CollectionFileWriter{
- m.kc,
- &manifest.ManifestStream{StreamName: dir},
- 0,
- 0,
- nil,
- make(chan *Block),
- make(chan []error),
- ""}
-
- m.mtx.Lock()
- if m.workers == nil {
- if m.MaxWriters < 1 {
- m.MaxWriters = 2
- }
- m.workers = make(chan struct{}, m.MaxWriters)
- }
- m.mtx.Unlock()
-
- go m.streamMap[dir].goUpload(m.workers)
- }
-
- fileWriter := m.streamMap[dir]
-
- // Reset the CollectionFileWriter for a new file
- fileWriter.NewFile(fn)
-
- m.status.Printf("Uploading %v/%v (%v bytes)", dir, fn, info.Size())
-
- _, err = io.Copy(fileWriter, file)
- if err != nil {
- m.status.Printf("Uh oh")
- return err
- }
-
- // Commits the current file. Legal to call this repeatedly.
- fileWriter.Close()
-
- return nil
-}
-
-func (cw *CollectionWriter) BeginUpload(root string, status *log.Logger) *WalkUpload {
- streamMap := make(map[string]*CollectionFileWriter)
- return &WalkUpload{0, cw.IKeepClient, root, streamMap, status, nil, sync.Mutex{}}
-}
-
-func (cw *CollectionWriter) EndUpload(wu *WalkUpload) {
- cw.mtx.Lock()
- for _, st := range wu.streamMap {
- cw.Streams = append(cw.Streams, st)
- }
- cw.mtx.Unlock()
-}
diff --git a/services/crunch-run/upload_test.go b/services/crunch-run/upload_test.go
deleted file mode 100644
index 24333c3..0000000
--- a/services/crunch-run/upload_test.go
+++ /dev/null
@@ -1,189 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package main
-
-import (
- "io/ioutil"
- "log"
- "os"
- "path/filepath"
- "sync"
- "syscall"
-
- . "gopkg.in/check.v1"
-)
-
-type UploadTestSuite struct{}
-
-// Gocheck boilerplate
-var _ = Suite(&UploadTestSuite{})
-
-func writeTree(cw *CollectionWriter, root string, status *log.Logger) (mt string, err error) {
- walkUpload := cw.BeginUpload(root, status)
-
- err = filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
- info, _ = os.Stat(path)
- if info.Mode().IsRegular() {
- return walkUpload.UploadFile(path, path)
- }
- return nil
- })
-
- cw.EndUpload(walkUpload)
- if err != nil {
- return "", err
- }
- mt, err = cw.ManifestText()
- return
-}
-
-func (s *TestSuite) TestSimpleUpload(c *C) {
- tmpdir, _ := ioutil.TempDir("", "")
- defer func() {
- os.RemoveAll(tmpdir)
- }()
-
- ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
-
- kc := &KeepTestClient{}
- defer kc.Close()
- cw := CollectionWriter{0, kc, nil, nil, sync.Mutex{}}
- str, err := writeTree(&cw, tmpdir, log.New(os.Stdout, "", 0))
- c.Check(err, IsNil)
- c.Check(str, Equals, ". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt\n")
-}
-
-func (s *TestSuite) TestUploadThreeFiles(c *C) {
- tmpdir, _ := ioutil.TempDir("", "")
- defer func() {
- os.RemoveAll(tmpdir)
- }()
-
- for _, err := range []error{
- ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600),
- ioutil.WriteFile(tmpdir+"/"+"file2.txt", []byte("bar"), 0600),
- os.Symlink("./file2.txt", tmpdir+"/file3.txt"),
- syscall.Mkfifo(tmpdir+"/ignore.fifo", 0600),
- } {
- c.Assert(err, IsNil)
- }
-
- kc := &KeepTestClient{}
- defer kc.Close()
- cw := CollectionWriter{0, kc, nil, nil, sync.Mutex{}}
- str, err := writeTree(&cw, tmpdir, log.New(os.Stdout, "", 0))
-
- c.Check(err, IsNil)
- c.Check(str, Equals, ". aa65a413921163458c52fea478d5d3ee+9 0:3:file1.txt 3:3:file2.txt 6:3:file3.txt\n")
-}
-
-func (s *TestSuite) TestSimpleUploadSubdir(c *C) {
- tmpdir, _ := ioutil.TempDir("", "")
- defer func() {
- os.RemoveAll(tmpdir)
- }()
-
- os.Mkdir(tmpdir+"/subdir", 0700)
-
- ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
- ioutil.WriteFile(tmpdir+"/subdir/file2.txt", []byte("bar"), 0600)
-
- kc := &KeepTestClient{}
- defer kc.Close()
- cw := CollectionWriter{0, kc, nil, nil, sync.Mutex{}}
- str, err := writeTree(&cw, tmpdir, log.New(os.Stdout, "", 0))
-
- c.Check(err, IsNil)
-
- // streams can get added in either order because of scheduling
- // of goroutines.
- if str != `. acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt
-./subdir 37b51d194a7513e45b56f6524f2d51f2+3 0:3:file2.txt
-` && str != `./subdir 37b51d194a7513e45b56f6524f2d51f2+3 0:3:file2.txt
-. acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt
-` {
- c.Error("Did not get expected manifest text")
- }
-}
-
-func (s *TestSuite) TestSimpleUploadLarge(c *C) {
- tmpdir, _ := ioutil.TempDir("", "")
- defer func() {
- os.RemoveAll(tmpdir)
- }()
-
- file, _ := os.Create(tmpdir + "/" + "file1.txt")
- data := make([]byte, 1024*1024-1)
- for i := range data {
- data[i] = byte(i % 10)
- }
- for i := 0; i < 65; i++ {
- file.Write(data)
- }
- file.Close()
-
- ioutil.WriteFile(tmpdir+"/"+"file2.txt", []byte("bar"), 0600)
-
- kc := &KeepTestClient{}
- defer kc.Close()
- cw := CollectionWriter{0, kc, nil, nil, sync.Mutex{}}
- str, err := writeTree(&cw, tmpdir, log.New(os.Stdout, "", 0))
-
- c.Check(err, IsNil)
- c.Check(str, Equals, ". 00ecf01e0d93385115c9f8bed757425d+67108864 485cd630387b6b1846fe429f261ea05f+1048514 0:68157375:file1.txt 68157375:3:file2.txt\n")
-}
-
-func (s *TestSuite) TestUploadEmptySubdir(c *C) {
- tmpdir, _ := ioutil.TempDir("", "")
- defer func() {
- os.RemoveAll(tmpdir)
- }()
-
- os.Mkdir(tmpdir+"/subdir", 0700)
-
- ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
-
- kc := &KeepTestClient{}
- defer kc.Close()
- cw := CollectionWriter{0, kc, nil, nil, sync.Mutex{}}
- str, err := writeTree(&cw, tmpdir, log.New(os.Stdout, "", 0))
-
- c.Check(err, IsNil)
- c.Check(str, Equals, `. acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt
-`)
-}
-
-func (s *TestSuite) TestUploadEmptyFile(c *C) {
- tmpdir, _ := ioutil.TempDir("", "")
- defer func() {
- os.RemoveAll(tmpdir)
- }()
-
- ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte(""), 0600)
-
- kc := &KeepTestClient{}
- defer kc.Close()
- cw := CollectionWriter{0, kc, nil, nil, sync.Mutex{}}
- str, err := writeTree(&cw, tmpdir, log.New(os.Stdout, "", 0))
-
- c.Check(err, IsNil)
- c.Check(str, Equals, `. d41d8cd98f00b204e9800998ecf8427e+0 0:0:file1.txt
-`)
-}
-
-func (s *TestSuite) TestUploadError(c *C) {
- tmpdir, _ := ioutil.TempDir("", "")
- defer func() {
- os.RemoveAll(tmpdir)
- }()
-
- ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
-
- cw := CollectionWriter{0, &KeepErrorTestClient{}, nil, nil, sync.Mutex{}}
- str, err := writeTree(&cw, tmpdir, log.New(os.Stdout, "", 0))
-
- c.Check(err, NotNil)
- c.Check(str, Equals, "")
-}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list