[ARVADOS] updated: 55fd854ed0b34ac926cbff403aec787678a0b068
git at public.curoverse.com
git at public.curoverse.com
Sun Dec 27 17:00:29 EST 2015
Summary of changes:
sdk/go/keepclient/collectionreader.go | 4 +
services/crunch-exec/crunchexec.go | 189 ++++++++++++++++++++++++++++----
services/crunch-exec/crunchexec_test.go | 91 +++++++++++++++
3 files changed, 265 insertions(+), 19 deletions(-)
create mode 100644 services/crunch-exec/crunchexec_test.go
via 55fd854ed0b34ac926cbff403aec787678a0b068 (commit)
from 29482d2345265284af2291c47a9b10c599aab743 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 55fd854ed0b34ac926cbff403aec787678a0b068
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Sun Dec 27 16:52:07 2015 -0500
7816: Work on getting image from keep and sending to Docker.
diff --git a/sdk/go/keepclient/collectionreader.go b/sdk/go/keepclient/collectionreader.go
index b532a16..d2c171d 100644
--- a/sdk/go/keepclient/collectionreader.go
+++ b/sdk/go/keepclient/collectionreader.go
@@ -40,6 +40,10 @@ func (kc *KeepClient) CollectionFileReader(collection map[string]interface{}, fi
return nil, ErrNoManifest
}
m := manifest.Manifest{Text: mText}
+ return kc.ManifestFileReader(m, filename)
+}
+
+func (kc *KeepClient) ManifestFileReader(m manifest.Manifest, filename string) (ReadCloserWithLen, error) {
rdrChan := make(chan *cfReader)
go kc.queueSegmentsToGet(m, filename, rdrChan)
r, ok := <-rdrChan
diff --git a/services/crunch-exec/crunchexec.go b/services/crunch-exec/crunchexec.go
index e9afa3a..1c9c5dd 100644
--- a/services/crunch-exec/crunchexec.go
+++ b/services/crunch-exec/crunchexec.go
@@ -1,12 +1,17 @@
package main
import (
- "fmt"
+ "flag"
+ //"fmt"
+ "errors"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "git.curoverse.com/arvados.git/sdk/go/manifest"
+ "github.com/curoverse/dockerclient"
+ "io"
"log"
"os"
- "os/exec"
+ //"os/exec"
"os/signal"
"strings"
"syscall"
@@ -14,46 +19,192 @@ import (
type IArvadosClient interface {
Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
+ Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error
Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error)
}
-type Mount struct { }
+type IKeepClient interface {
+ PutHB(hash string, buf []byte) (string, int, error)
+ ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error)
+}
+
+type Mount struct{}
+
+type Collection struct {
+ ManifestText string `json:"manifest_text"`
+}
type Container struct {
- Uuid string `json:"uuid"`,
- Command []string `json:"command"`,
- ContainerImage string `json:"container_image`,
- Cwd string `json:"cwd"`,
- Environment map[string]string `json:"environment"`,
- Mounts map[string]Mount `json:"mounts"`,
- OutputPath string `json:"output_path"`,
- Priority int `json:"priority"`,
+ Uuid string `json:"uuid"`
+ Command []string `json:"command"`
+ ContainerImage string `json:"container_image`
+ Cwd string `json:"cwd"`
+ Environment map[string]string `json:"environment"`
+ Mounts map[string]Mount `json:"mounts"`
+ OutputPath string `json:"output_path"`
+ Priority int `json:"priority"`
RuntimeConstraints map[string]string `json:"runtime_constraints"`
+ State string `json:"state"`
+}
+
+// Callback used to listen to Docker's events
+func dockerEvent(event *dockerclient.Event, ec chan error, args ...interface{}) {
+ log.Printf("Received event: %#v\n", *event)
+}
+
+type ContainerRunner struct {
+ Docker *dockerclient.DockerClient
+ Api IArvadosClient
+ Kc IKeepClient
+ Container
+ dockerclient.ContainerConfig
+}
+
+func (this *ContainerRunner) setupMonitoring() error {
+ this.Docker.StartMonitorEvents(dockerEvent, nil)
+
+ sigChan := make(chan os.Signal, 1)
+ signal.Notify(sigChan, syscall.SIGTERM)
+ signal.Notify(sigChan, syscall.SIGINT)
+ signal.Notify(sigChan, syscall.SIGQUIT)
+
+ go func(sig <-chan os.Signal) {
+ //for sig := range sig {
+ //}
+ }(sigChan)
+
+ return nil
+}
+
+func (this *ContainerRunner) LoadImage() (err error) {
+ var collection Collection
+ err = this.Api.Get("collections", this.Container.ContainerImage, nil, &collection)
+ if err != nil {
+ return err
+ }
+ manifest := manifest.Manifest{Text: collection.ManifestText}
+ var img, imageId string
+ for ms := range manifest.StreamIter() {
+ img = ms.FileTokens[0].Name
+ if !strings.HasSuffix(img, ".tar") {
+ return errors.New("First file in the collection does not end in .tar")
+ }
+ imageId = img[len(img)-4:]
+ }
+
+ _, err = this.Docker.InspectImage(imageId)
+ if err != nil {
+ var readCloser io.ReadCloser
+ readCloser, err = this.Kc.ManifestFileReader(manifest, img)
+ if err != nil {
+ return err
+ }
+
+ err = this.Docker.LoadImage(readCloser)
+ if err != nil {
+ return err
+ }
+ }
+
+ this.ContainerConfig.Image = imageId
+
+ return nil
+}
+
+func (this *ContainerRunner) startContainer() error {
+ containerId, err := this.Docker.CreateContainer(&this.ContainerConfig, "foobar", nil)
+ if err != nil {
+ return err
+ }
+ hostConfig := &dockerclient.HostConfig{}
+ err = this.Docker.StartContainer(containerId, hostConfig)
+ if err != nil {
+ return err
+ }
+
+ return nil
}
-func runner(api IArvadosClient,
- kc IKeepClient,
- containerUuid string) error {
+func (this *ContainerRunner) getLogs() error {
+ return nil
+}
+
+func (this *ContainerRunner) waitFinish() error {
+ return nil
+}
+
+func (this *ContainerRunner) writeLogs() error {
+ return nil
+}
+
+func (this *ContainerRunner) updateContainer() error {
+ return nil
+}
+
+func (this *ContainerRunner) Run(containerUuid string) (err error) {
+
+ err = this.Api.Get("containers", containerUuid, nil, &this.Container)
+ if err != nil {
+ return
+ }
+
+ // (0) start event monitoring goroutines
+ err = this.setupMonitoring()
+ if err != nil {
+ return
+ }
+
+ // (1) check for and/or load image
+ err = this.LoadImage()
+ if err != nil {
+ return
+ }
+
+ // (2) start container
+ err = this.startContainer()
+ if err != nil {
+ return
+ }
+
+ // (3) attach container logs
+ err = this.getLogs()
+
+ // (4) wait for container to finish
+ err = this.waitFinish()
+
+ // (5) write logs
+ err = this.writeLogs()
+
+ // (6) update container record with results
+ this.updateContainer()
+
+ return
}
func main() {
+ flag.Parse()
+
+ var cr ContainerRunner
+
api, err := arvadosclient.MakeArvadosClient()
if err != nil {
log.Fatal(err)
}
+ cr.Api = api
- var containerStruct Container
- err = api.Get("containers", containerUuid, nil, &containerStruct)
+ cr.Kc, err = keepclient.MakeKeepClient(&api)
if err != nil {
log.Fatal(err)
}
- var kc IKeepClient
- kc, err = keepclient.MakeKeepClient(&api)
+ cr.Docker, err = dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
if err != nil {
log.Fatal(err)
}
- err = runner(api, kc, jobStruct)
+ err = cr.Run(flag.Arg(0))
+ if err != nil {
+ log.Fatal(err)
+ }
}
diff --git a/services/crunch-exec/crunchexec_test.go b/services/crunch-exec/crunchexec_test.go
new file mode 100644
index 0000000..c2cb506
--- /dev/null
+++ b/services/crunch-exec/crunchexec_test.go
@@ -0,0 +1,91 @@
+package main
+
+import (
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "git.curoverse.com/arvados.git/sdk/go/manifest"
+ "github.com/curoverse/dockerclient"
+ . "gopkg.in/check.v1"
+ "io"
+ //"io/ioutil"
+ //"log"
+ "os"
+ //"syscall"
+ "testing"
+ //"time"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+ TestingT(t)
+}
+
+type TestSuite struct{}
+
+// Gocheck boilerplate
+var _ = Suite(&TestSuite{})
+
+type ArvTestClient struct {
+ c *C
+ manifest string
+ success bool
+}
+
+type KeepTestClient struct {
+ c *C
+ manifest string
+ success bool
+}
+
+func (this ArvTestClient) Create(resourceType string,
+ parameters arvadosclient.Dict,
+ output interface{}) error {
+ return nil
+}
+
+var busyboxManifest = ". 59950b5bd8b0854ac44669c5559c4358+1321984+Af83e8fa245ab84f5817064ba4e99aed87060bea5 at 5692cadc 0:1321984:fc0db02f30724abc777d7ae2b2404c6d074f1e2ceca19912352aea30a42f50b7.tar\n"
+var busyboxPDH = "d4ab34d3d4f8a72f5c4973051ae69fab+122"
+
+func (this ArvTestClient) Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error {
+ if resourceType == "collections" {
+ if uuid == busyboxPDH {
+ output.(*Collection).ManifestText = busyboxManifest
+
+ }
+ }
+ return nil
+}
+
+func (this ArvTestClient) Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) {
+ return nil
+}
+
+func (this KeepTestClient) PutHB(hash string, buf []byte) (string, int, error) {
+ return "", 0, nil
+}
+
+type FileWrapper struct {
+ io.ReadCloser
+ len uint64
+}
+
+func (this FileWrapper) Len() uint64 {
+ return this.len
+}
+
+func (this KeepTestClient) ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error) {
+ if filename == "fc0db02f30724abc777d7ae2b2404c6d074f1e2ceca19912352aea30a42f50b7.tar" {
+ rdr, err := os.Open("fc0db02f30724abc777d7ae2b2404c6d074f1e2ceca19912352aea30a42f50b7.tar")
+ return FileWrapper{rdr, 1321984}, err
+ }
+ return nil, nil
+}
+
+func (s *TestSuite) TestLoadImage(c *C) {
+ cr := ContainerRunner{}
+ cr.Docker, _ = dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
+ cr.Api = ArvTestClient{manifest: busyboxManifest}
+ cr.Kc = KeepTestClient{}
+ cr.Container.ContainerImage = busyboxPDH
+ cr.LoadImage()
+}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list