[ARVADOS] created: 08d080d5a4edccaa579f9a5b5cfa18c9ac471430
git at public.curoverse.com
git at public.curoverse.com
Fri Feb 12 00:04:25 EST 2016
at 08d080d5a4edccaa579f9a5b5cfa18c9ac471430 (commit)
commit 08d080d5a4edccaa579f9a5b5cfa18c9ac471430
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Fri Feb 12 00:04:20 2016 -0500
8015: Crunch run mounting and output upload WIP
diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index 640ac88..1248854 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -8,8 +8,10 @@ import (
"git.curoverse.com/arvados.git/sdk/go/manifest"
"github.com/curoverse/dockerclient"
"io"
+ "ioutil"
"log"
"os"
+ "os/exec"
"os/signal"
"strings"
"sync"
@@ -33,7 +35,13 @@ type IKeepClient interface {
}
// Mount describes the mount points to create inside the container.
-type Mount struct{}
+type Mount struct {
+ Kind string `json:"kind"`
+ Writable bool `json:"writable"`
+ PortableDataHash string `json:"portable_data_hash"`
+ UUID string `json:"uuid"`
+ DeviceType string `json:"device_type"`
+}
// Collection record returned by the API server.
type Collection struct {
@@ -86,6 +94,8 @@ type ContainerRunner struct {
Stderr *ThrottledLogger
LogCollection *CollectionWriter
LogsPDH *string
+ ArvMount *exec.Cmd
+ ArvMountPoint string
CancelLock sync.Mutex
Cancelled bool
SigChan chan os.Signal
@@ -163,6 +173,61 @@ func (runner *ContainerRunner) LoadImage() (err error) {
return nil
}
+func (runner *ContainerRunner) SetupMounts(hostConfig *dockerclient.HostConfig) (err error) {
+ runner.ArvMountPoint = ioutil.TempDir("", "keep")
+ pdhOnly := true
+ tmpcount := 0
+ arvMountCmd := []string{"--foreground"}
+
+ for bind, mnt := range runner.ContainerRecord.Mounts {
+ if mnt.Kind == "collection" {
+ var src string
+ if mnt.UUID != "" && mnt.PortableDataHash != "" {
+ return fmt.Errorf("Cannot specify both 'uuid' and 'portable_data_hash' for a collection")
+ }
+ if mnt.UUID != "" {
+ if mnt.Writable {
+ return fmt.Errorf("Writing to collection currently not permitted.")
+ }
+ pdhOnly = false
+ src = fmt.Sprintf("%s/by_id/%s", arvMountPoint, mnt.UUID)
+ } else if mnt.PortableDataHash != "" {
+ if mnt.Writable {
+ return fmt.Errorf("Can never write to a collection specified by portable data hash")
+ }
+ src = fmt.Sprintf("%s/by_id/%s", arvMountPoint, mnt.PortableDataHash)
+ } else {
+ src = fmt.Sprintf("%s/tmp%i", arvMountPoint, tmpcount)
+ arvMountCmd = append(arvMountCmd, "--mount-tmp")
+ arvMountCmd = append(arvMountCmd, fmt.Sprintf("tmp%i", tmpcount))
+ tmpcount += 1
+ }
+ if mnt.Writable {
+ hostConfig.Binds = append(hostConfig.Binds, fmt.Sprintf("%s:%s", src, bind))
+ } else {
+ hostConfig.Binds = append(hostConfig.Binds, fmt.Sprintf("%s:%s:ro", src, bind))
+ }
+ } else if mnt.Kind == "tmp" {
+ hostConfig.Binds = append(hostConfig.Binds, bind)
+ } else {
+ return fmt.Errorf("Unknown mount kind '%s'", mnt.Kind)
+ }
+ }
+
+ if pdhOnly {
+ arvMountCmd = append(arvMountCmd, "--mount-by-pdh", "by_id")
+ } else {
+ arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_id")
+ }
+ arvMountCmd = append(arvMountCmd, arvMountPoint)
+
+ runner.ArvMount = exec.Command("arv-mount", arvMountCmd)
+ err = runner.ArvMount.Start()
+ if err != nil {
+ return err
+ }
+}
+
// StartContainer creates the container and runs it.
func (runner *ContainerRunner) StartContainer() (err error) {
runner.CrunchLog.Print("Creating Docker container")
@@ -187,6 +252,11 @@ func (runner *ContainerRunner) StartContainer() (err error) {
}
hostConfig := &dockerclient.HostConfig{}
+ err = runner.SetupMounts(hostConfig)
+ if err != nil {
+ return
+ }
+
runner.CrunchLog.Printf("Starting Docker container id '%s'", runner.ContainerID)
err = runner.Docker.StartContainer(runner.ContainerID, hostConfig)
if err != nil {
@@ -239,6 +309,9 @@ func (runner *ContainerRunner) WaitFinish() error {
runner.Stdout.Close()
runner.Stderr.Close()
+ umount := exec.Command("fusermount", "-z", "-u", runner.ArvMountPoint)
+ umount.Run()
+
return nil
}
diff --git a/services/crunch-run/upload.go b/services/crunch-run/upload.go
index 4a2693a..d50ea58 100644
--- a/services/crunch-run/upload.go
+++ b/services/crunch-run/upload.go
@@ -191,3 +191,74 @@ func (m *CollectionWriter) ManifestText() (mt string, err error) {
}
return buf.String(), nil
}
+
+func (m *CollectionWriter) WalkFunc(path string, info os.FileInfo, err error, stripPrefix string) error {
+ if info.IsDir() {
+ return nil
+ }
+
+ var dir string
+ if len(path) > (len(stripPrefix) + len(info.Name()) + 1) {
+ dir = path[len(stripPrefix)+1 : (len(path) - len(info.Name()) - 1)]
+ }
+ if dir == "" {
+ dir = "."
+ }
+
+ fn := path[(len(path) - len(info.Name())):]
+
+ if m.Streams[dir] == nil {
+ m.Streams[dir] = &CollectionFileWriter{
+ m.IKeepClient,
+ &manifest.ManifestStream{StreamName: dir},
+ 0,
+ 0,
+ nil,
+ make(chan *Block),
+ make(chan []error),
+ ""}
+ go m.Streams[dir].goUpload()
+ }
+
+ stream := m.Streams[dir]
+
+ fileStart := stream.offset
+
+ file, err := os.Open(path)
+ if err != nil {
+ return err
+ }
+
+ log.Printf("Uploading %v/%v (%v bytes)", dir, fn, info.Size())
+
+ var count int64
+ count, err = io.Copy(stream, file)
+ if err != nil {
+ return err
+ }
+
+ stream.offset += count
+
+ stream.ManifestStream.FileStreamSegments = append(stream.ManifestStream.FileStreamSegments,
+ manifest.FileStreamSegment{uint64(fileStart), uint64(count), fn})
+
+ return nil
+}
+
+func WriteTree(kc IKeepClient, root string) (manifest string, err error) {
+ mw := CollectionWriter{kc, root, map[string]*ManifestStreamWriter{}}
+ err = filepath.Walk(root, func(path string, info os.FileInfo, err error) {
+ return mw.WalkFunc(path, info, err, root)
+ })
+
+ if err != nil {
+ return "", err
+ }
+
+ err = mw.Finish()
+ if err != nil {
+ return "", err
+ }
+
+ return mw.ManifestText(), nil
+}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list