[ARVADOS] updated: 55fd854ed0b34ac926cbff403aec787678a0b068

git at public.curoverse.com git at public.curoverse.com
Sun Dec 27 17:00:29 EST 2015


Summary of changes:
 sdk/go/keepclient/collectionreader.go   |   4 +
 services/crunch-exec/crunchexec.go      | 189 ++++++++++++++++++++++++++++----
 services/crunch-exec/crunchexec_test.go |  91 +++++++++++++++
 3 files changed, 265 insertions(+), 19 deletions(-)
 create mode 100644 services/crunch-exec/crunchexec_test.go

       via  55fd854ed0b34ac926cbff403aec787678a0b068 (commit)
      from  29482d2345265284af2291c47a9b10c599aab743 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 55fd854ed0b34ac926cbff403aec787678a0b068
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Sun Dec 27 16:52:07 2015 -0500

    7816: Work on getting image from keep and sending to Docker.

diff --git a/sdk/go/keepclient/collectionreader.go b/sdk/go/keepclient/collectionreader.go
index b532a16..d2c171d 100644
--- a/sdk/go/keepclient/collectionreader.go
+++ b/sdk/go/keepclient/collectionreader.go
@@ -40,6 +40,10 @@ func (kc *KeepClient) CollectionFileReader(collection map[string]interface{}, fi
 		return nil, ErrNoManifest
 	}
 	m := manifest.Manifest{Text: mText}
+	return kc.ManifestFileReader(m, filename)
+}
+
+func (kc *KeepClient) ManifestFileReader(m manifest.Manifest, filename string) (ReadCloserWithLen, error) {
 	rdrChan := make(chan *cfReader)
 	go kc.queueSegmentsToGet(m, filename, rdrChan)
 	r, ok := <-rdrChan
diff --git a/services/crunch-exec/crunchexec.go b/services/crunch-exec/crunchexec.go
index e9afa3a..1c9c5dd 100644
--- a/services/crunch-exec/crunchexec.go
+++ b/services/crunch-exec/crunchexec.go
@@ -1,12 +1,17 @@
 package main
 
 import (
-	"fmt"
+	"flag"
+	//"fmt"
+	"errors"
 	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
 	"git.curoverse.com/arvados.git/sdk/go/keepclient"
+	"git.curoverse.com/arvados.git/sdk/go/manifest"
+	"github.com/curoverse/dockerclient"
+	"io"
 	"log"
 	"os"
-	"os/exec"
+	//"os/exec"
 	"os/signal"
 	"strings"
 	"syscall"
@@ -14,46 +19,192 @@ import (
 
 type IArvadosClient interface {
 	Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
+	Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error
 	Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error)
 }
 
-type Mount struct { }
+type IKeepClient interface {
+	PutHB(hash string, buf []byte) (string, int, error)
+	ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error)
+}
+
+type Mount struct{}
+
+type Collection struct {
+	ManifestText string `json:"manifest_text"`
+}
 
 type Container struct {
-	Uuid string `json:"uuid"`,
-	Command []string `json:"command"`,
-	ContainerImage string `json:"container_image`,
-	Cwd string `json:"cwd"`,
-	Environment map[string]string `json:"environment"`,
-	Mounts map[string]Mount `json:"mounts"`,
- 	OutputPath string `json:"output_path"`,
-	Priority int `json:"priority"`,
+	Uuid               string            `json:"uuid"`
+	Command            []string          `json:"command"`
+	ContainerImage     string            `json:"container_image`
+	Cwd                string            `json:"cwd"`
+	Environment        map[string]string `json:"environment"`
+	Mounts             map[string]Mount  `json:"mounts"`
+	OutputPath         string            `json:"output_path"`
+	Priority           int               `json:"priority"`
 	RuntimeConstraints map[string]string `json:"runtime_constraints"`
+	State              string            `json:"state"`
+}
+
+// Callback used to listen to Docker's events
+func dockerEvent(event *dockerclient.Event, ec chan error, args ...interface{}) {
+	log.Printf("Received event: %#v\n", *event)
+}
+
+type ContainerRunner struct {
+	Docker *dockerclient.DockerClient
+	Api    IArvadosClient
+	Kc     IKeepClient
+	Container
+	dockerclient.ContainerConfig
+}
+
+func (this *ContainerRunner) setupMonitoring() error {
+	this.Docker.StartMonitorEvents(dockerEvent, nil)
+
+	sigChan := make(chan os.Signal, 1)
+	signal.Notify(sigChan, syscall.SIGTERM)
+	signal.Notify(sigChan, syscall.SIGINT)
+	signal.Notify(sigChan, syscall.SIGQUIT)
+
+	go func(sig <-chan os.Signal) {
+		//for sig := range sig {
+		//}
+	}(sigChan)
+
+	return nil
+}
+
+func (this *ContainerRunner) LoadImage() (err error) {
+	var collection Collection
+	err = this.Api.Get("collections", this.Container.ContainerImage, nil, &collection)
+	if err != nil {
+		return err
+	}
+	manifest := manifest.Manifest{Text: collection.ManifestText}
+	var img, imageId string
+	for ms := range manifest.StreamIter() {
+		img = ms.FileTokens[0].Name
+		if !strings.HasSuffix(img, ".tar") {
+			return errors.New("First file in the collection does not end in .tar")
+		}
+		imageId = img[len(img)-4:]
+	}
+
+	_, err = this.Docker.InspectImage(imageId)
+	if err != nil {
+		var readCloser io.ReadCloser
+		readCloser, err = this.Kc.ManifestFileReader(manifest, img)
+		if err != nil {
+			return err
+		}
+
+		err = this.Docker.LoadImage(readCloser)
+		if err != nil {
+			return err
+		}
+	}
+
+	this.ContainerConfig.Image = imageId
+
+	return nil
+}
+
+func (this *ContainerRunner) startContainer() error {
+	containerId, err := this.Docker.CreateContainer(&this.ContainerConfig, "foobar", nil)
+	if err != nil {
+		return err
+	}
+	hostConfig := &dockerclient.HostConfig{}
+	err = this.Docker.StartContainer(containerId, hostConfig)
+	if err != nil {
+		return err
+	}
+
+	return nil
 }
 
-func runner(api IArvadosClient,
-	kc IKeepClient,
-	containerUuid string) error {
+func (this *ContainerRunner) getLogs() error {
+	return nil
+}
+
+func (this *ContainerRunner) waitFinish() error {
+	return nil
+}
+
+func (this *ContainerRunner) writeLogs() error {
+	return nil
+}
+
+func (this *ContainerRunner) updateContainer() error {
+	return nil
+}
+
+func (this *ContainerRunner) Run(containerUuid string) (err error) {
+
+	err = this.Api.Get("containers", containerUuid, nil, &this.Container)
+	if err != nil {
+		return
+	}
+
+	// (0) start event monitoring goroutines
+	err = this.setupMonitoring()
+	if err != nil {
+		return
+	}
+
+	// (1) check for and/or load image
+	err = this.LoadImage()
+	if err != nil {
+		return
+	}
+
+	// (2) start container
+	err = this.startContainer()
+	if err != nil {
+		return
+	}
+
+	// (3) attach container logs
+	err = this.getLogs()
+
+	// (4) wait for container to finish
+	err = this.waitFinish()
+
+	// (5) write logs
+	err = this.writeLogs()
+
+	// (6) update container record with results
+	this.updateContainer()
+
+	return
 }
 
 func main() {
+	flag.Parse()
+
+	var cr ContainerRunner
+
 	api, err := arvadosclient.MakeArvadosClient()
 	if err != nil {
 		log.Fatal(err)
 	}
+	cr.Api = api
 
-	var containerStruct Container
-	err = api.Get("containers", containerUuid, nil, &containerStruct)
+	cr.Kc, err = keepclient.MakeKeepClient(&api)
 	if err != nil {
 		log.Fatal(err)
 	}
 
-	var kc IKeepClient
-	kc, err = keepclient.MakeKeepClient(&api)
+	cr.Docker, err = dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
 	if err != nil {
 		log.Fatal(err)
 	}
 
-	err = runner(api, kc, jobStruct)
+	err = cr.Run(flag.Arg(0))
+	if err != nil {
+		log.Fatal(err)
+	}
 
 }
diff --git a/services/crunch-exec/crunchexec_test.go b/services/crunch-exec/crunchexec_test.go
new file mode 100644
index 0000000..c2cb506
--- /dev/null
+++ b/services/crunch-exec/crunchexec_test.go
@@ -0,0 +1,91 @@
+package main
+
+import (
+	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+	"git.curoverse.com/arvados.git/sdk/go/keepclient"
+	"git.curoverse.com/arvados.git/sdk/go/manifest"
+	"github.com/curoverse/dockerclient"
+	. "gopkg.in/check.v1"
+	"io"
+	//"io/ioutil"
+	//"log"
+	"os"
+	//"syscall"
+	"testing"
+	//"time"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+	TestingT(t)
+}
+
+type TestSuite struct{}
+
+// Gocheck boilerplate
+var _ = Suite(&TestSuite{})
+
+type ArvTestClient struct {
+	c        *C
+	manifest string
+	success  bool
+}
+
+type KeepTestClient struct {
+	c        *C
+	manifest string
+	success  bool
+}
+
+func (this ArvTestClient) Create(resourceType string,
+	parameters arvadosclient.Dict,
+	output interface{}) error {
+	return nil
+}
+
+var busyboxManifest = ". 59950b5bd8b0854ac44669c5559c4358+1321984+Af83e8fa245ab84f5817064ba4e99aed87060bea5 at 5692cadc 0:1321984:fc0db02f30724abc777d7ae2b2404c6d074f1e2ceca19912352aea30a42f50b7.tar\n"
+var busyboxPDH = "d4ab34d3d4f8a72f5c4973051ae69fab+122"
+
+func (this ArvTestClient) Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error {
+	if resourceType == "collections" {
+		if uuid == busyboxPDH {
+			output.(*Collection).ManifestText = busyboxManifest
+
+		}
+	}
+	return nil
+}
+
+func (this ArvTestClient) Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) {
+	return nil
+}
+
+func (this KeepTestClient) PutHB(hash string, buf []byte) (string, int, error) {
+	return "", 0, nil
+}
+
+type FileWrapper struct {
+	io.ReadCloser
+	len uint64
+}
+
+func (this FileWrapper) Len() uint64 {
+	return this.len
+}
+
+func (this KeepTestClient) ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error) {
+	if filename == "fc0db02f30724abc777d7ae2b2404c6d074f1e2ceca19912352aea30a42f50b7.tar" {
+		rdr, err := os.Open("fc0db02f30724abc777d7ae2b2404c6d074f1e2ceca19912352aea30a42f50b7.tar")
+		return FileWrapper{rdr, 1321984}, err
+	}
+	return nil, nil
+}
+
+func (s *TestSuite) TestLoadImage(c *C) {
+	cr := ContainerRunner{}
+	cr.Docker, _ = dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
+	cr.Api = ArvTestClient{manifest: busyboxManifest}
+	cr.Kc = KeepTestClient{}
+	cr.Container.ContainerImage = busyboxPDH
+	cr.LoadImage()
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list