[ARVADOS] created: 08d080d5a4edccaa579f9a5b5cfa18c9ac471430

git at public.curoverse.com git at public.curoverse.com
Fri Feb 12 00:04:25 EST 2016


        at  08d080d5a4edccaa579f9a5b5cfa18c9ac471430 (commit)


commit 08d080d5a4edccaa579f9a5b5cfa18c9ac471430
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Fri Feb 12 00:04:20 2016 -0500

    8015: Crunch run mounting and output upload WIP

diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index 640ac88..1248854 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -8,8 +8,10 @@ import (
 	"git.curoverse.com/arvados.git/sdk/go/manifest"
 	"github.com/curoverse/dockerclient"
 	"io"
+	"ioutil"
 	"log"
 	"os"
+	"os/exec"
 	"os/signal"
 	"strings"
 	"sync"
@@ -33,7 +35,13 @@ type IKeepClient interface {
 }
 
 // Mount describes the mount points to create inside the container.
-type Mount struct{}
+type Mount struct {
+	Kind             string `json:"kind"`
+	Writable         bool   `json:"writable"`
+	PortableDataHash string `json:"portable_data_hash"`
+	UUID             string `json:"uuid"`
+	DeviceType       string `json:"device_type"`
+}
 
 // Collection record returned by the API server.
 type Collection struct {
@@ -86,6 +94,8 @@ type ContainerRunner struct {
 	Stderr        *ThrottledLogger
 	LogCollection *CollectionWriter
 	LogsPDH       *string
+	ArvMount      *exec.Cmd
+	ArvMountPoint string
 	CancelLock    sync.Mutex
 	Cancelled     bool
 	SigChan       chan os.Signal
@@ -163,6 +173,61 @@ func (runner *ContainerRunner) LoadImage() (err error) {
 	return nil
 }
 
+func (runner *ContainerRunner) SetupMounts(hostConfig *dockerclient.HostConfig) (err error) {
+	runner.ArvMountPoint = ioutil.TempDir("", "keep")
+	pdhOnly := true
+	tmpcount := 0
+	arvMountCmd := []string{"--foreground"}
+
+	for bind, mnt := range runner.ContainerRecord.Mounts {
+		if mnt.Kind == "collection" {
+			var src string
+			if mnt.UUID != "" && mnt.PortableDataHash != "" {
+				return fmt.Errorf("Cannot specify both 'uuid' and 'portable_data_hash' for a collection")
+			}
+			if mnt.UUID != "" {
+				if mnt.Writable {
+					return fmt.Errorf("Writing to collection currently not permitted.")
+				}
+				pdhOnly = false
+				src = fmt.Sprintf("%s/by_id/%s", arvMountPoint, mnt.UUID)
+			} else if mnt.PortableDataHash != "" {
+				if mnt.Writable {
+					return fmt.Errorf("Can never write to a collection specified by portable data hash")
+				}
+				src = fmt.Sprintf("%s/by_id/%s", arvMountPoint, mnt.PortableDataHash)
+			} else {
+				src = fmt.Sprintf("%s/tmp%i", arvMountPoint, tmpcount)
+				arvMountCmd = append(arvMountCmd, "--mount-tmp")
+				arvMountCmd = append(arvMountCmd, fmt.Sprintf("tmp%i", tmpcount))
+				tmpcount += 1
+			}
+			if mnt.Writable {
+				hostConfig.Binds = append(hostConfig.Binds, fmt.Sprintf("%s:%s", src, bind))
+			} else {
+				hostConfig.Binds = append(hostConfig.Binds, fmt.Sprintf("%s:%s:ro", src, bind))
+			}
+		} else if mnt.Kind == "tmp" {
+			hostConfig.Binds = append(hostConfig.Binds, bind)
+		} else {
+			return fmt.Errorf("Unknown mount kind '%s'", mnt.Kind)
+		}
+	}
+
+	if pdhOnly {
+		arvMountCmd = append(arvMountCmd, "--mount-by-pdh", "by_id")
+	} else {
+		arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_id")
+	}
+	arvMountCmd = append(arvMountCmd, arvMountPoint)
+
+	runner.ArvMount = exec.Command("arv-mount", arvMountCmd)
+	err = runner.ArvMount.Start()
+	if err != nil {
+		return err
+	}
+}
+
 // StartContainer creates the container and runs it.
 func (runner *ContainerRunner) StartContainer() (err error) {
 	runner.CrunchLog.Print("Creating Docker container")
@@ -187,6 +252,11 @@ func (runner *ContainerRunner) StartContainer() (err error) {
 	}
 	hostConfig := &dockerclient.HostConfig{}
 
+	err = runner.SetupMounts(hostConfig)
+	if err != nil {
+		return
+	}
+
 	runner.CrunchLog.Printf("Starting Docker container id '%s'", runner.ContainerID)
 	err = runner.Docker.StartContainer(runner.ContainerID, hostConfig)
 	if err != nil {
@@ -239,6 +309,9 @@ func (runner *ContainerRunner) WaitFinish() error {
 	runner.Stdout.Close()
 	runner.Stderr.Close()
 
+	umount := exec.Command("fusermount", "-z", "-u", runner.ArvMountPoint)
+	umount.Run()
+
 	return nil
 }
 
diff --git a/services/crunch-run/upload.go b/services/crunch-run/upload.go
index 4a2693a..d50ea58 100644
--- a/services/crunch-run/upload.go
+++ b/services/crunch-run/upload.go
@@ -191,3 +191,74 @@ func (m *CollectionWriter) ManifestText() (mt string, err error) {
 	}
 	return buf.String(), nil
 }
+
+func (m *CollectionWriter) WalkFunc(path string, info os.FileInfo, err error, stripPrefix string) error {
+	if info.IsDir() {
+		return nil
+	}
+
+	var dir string
+	if len(path) > (len(stripPrefix) + len(info.Name()) + 1) {
+		dir = path[len(stripPrefix)+1 : (len(path) - len(info.Name()) - 1)]
+	}
+	if dir == "" {
+		dir = "."
+	}
+
+	fn := path[(len(path) - len(info.Name())):]
+
+	if m.Streams[dir] == nil {
+		m.Streams[dir] = &CollectionFileWriter{
+			m.IKeepClient,
+			&manifest.ManifestStream{StreamName: dir},
+			0,
+			0,
+			nil,
+			make(chan *Block),
+			make(chan []error),
+			""}
+		go m.Streams[dir].goUpload()
+	}
+
+	stream := m.Streams[dir]
+
+	fileStart := stream.offset
+
+	file, err := os.Open(path)
+	if err != nil {
+		return err
+	}
+
+	log.Printf("Uploading %v/%v (%v bytes)", dir, fn, info.Size())
+
+	var count int64
+	count, err = io.Copy(stream, file)
+	if err != nil {
+		return err
+	}
+
+	stream.offset += count
+
+	stream.ManifestStream.FileStreamSegments = append(stream.ManifestStream.FileStreamSegments,
+		manifest.FileStreamSegment{uint64(fileStart), uint64(count), fn})
+
+	return nil
+}
+
+func WriteTree(kc IKeepClient, root string) (manifest string, err error) {
+	mw := CollectionWriter{kc, root, map[string]*ManifestStreamWriter{}}
+	err = filepath.Walk(root, func(path string, info os.FileInfo, err error) {
+		return mw.WalkFunc(path, info, err, root)
+	})
+
+	if err != nil {
+		return "", err
+	}
+
+	err = mw.Finish()
+	if err != nil {
+		return "", err
+	}
+
+	return mw.ManifestText(), nil
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list