[ARVADOS] created: 2.1.0-486-gc1d1f0502
Git user
git at public.arvados.org
Mon Mar 15 20:44:02 UTC 2021
at c1d1f0502b8a0f049dba41da2f6b19a0d4b03d77 (commit)
commit c1d1f0502b8a0f049dba41da2f6b19a0d4b03d77
Author: Nico Cesar <nico at nicocesar.com>
Date: Mon Mar 15 16:42:26 2021 -0400
First step in a 1-to-1 abstraction with docker
Arvados-DCO-1.1-Signed-off-by: Nico Cesar <nico at curii.com>
diff --git a/lib/crunchrun/container_exec_types.go b/lib/crunchrun/container_exec_types.go
new file mode 100644
index 000000000..cc78a29df
--- /dev/null
+++ b/lib/crunchrun/container_exec_types.go
@@ -0,0 +1,294 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+// SPDX-License-Identifier: AGPL-3.0
+package crunchrun
+import (
+ "bufio"
+ "io"
+ "net"
+ "golang.org/x/net/context"
+// ContainerConfig holds all values needed for Docker and Singularity
+// to run a container. In the case of docker is similar to
+// github.com/docker/docker/api/types/container/Config
+// see https://github.com/moby/moby/blob/master/api/types/container/config.go
+// "It should hold only portable information about the container."
+// and for Singularity TBD
+type ContainerConfig struct {
+ OpenStdin bool
+ StdinOnce bool
+ AttachStdin bool
+ AttachStdout bool
+ AttachStderr bool
+ Cmd []string
+ WorkingDir string
+ Env []string
+ Volumes map[string]struct{}
+// HostConfig holds all values needed for Docker and Singularity
+// to run a container related to the host. In the case of docker is
+// similar to github.com/docker/docker/api/types/container/HostConfig
+// see https://github.com/moby/moby/blob/master/api/types/container/host_config.go
+// "dependent of the host we are running on".
+// and for Singularity TBD
+type HostConfig struct {
+ //important bits:
+ // - Binds:
+ // LogConfig
+ // Resources: see dockercontainer.Resources
+ // NetworkMode: see dockercontainer.NetworkMode
+// EndpointIPAMConfig represents IPAM configurations for the endpoint
+type EndpointIPAMConfig struct {
+ IPv4Address string `json:",omitempty"`
+ IPv6Address string `json:",omitempty"`
+ LinkLocalIPs []string `json:",omitempty"`
+// EndpointSettings stores the network endpoint details
+type EndpointSettings struct {
+ // Configurations
+ IPAMConfig *EndpointIPAMConfig
+ Links []string
+ Aliases []string
+ // Operational data
+ NetworkID string
+ EndpointID string
+ Gateway string
+ IPAddress string
+ IPPrefixLen int
+ IPv6Gateway string
+ GlobalIPv6Address string
+ GlobalIPv6PrefixLen int
+ MacAddress string
+ DriverOpts map[string]string
+// ----
+// NetworkingConfig holds all values needed for Docker and Singularity
+// related network. In the case of docker is similar to
+// github.com/docker/docker/api/types/network/NetworkingConfig
+// and for Singularity TBD.
+type NetworkingConfig struct {
+ EndpointsConfig map[string]*EndpointSettings
+// ContainerCreateResponse in the case of docker will be similar to
+// github.com/docker/docker/api/types/container/ContainerCreateCreatedBody
+// and for Singularity TBD.
+type ContainerCreateResponse struct {
+ // The ID of the created container
+ // Required: true
+ ID string
+ // Warnings encountered when creating the container
+ // Required: true
+ Warnings []string
+// ContainerStartOptions in the case of docker will be similar to
+// github.com/docker/docker/api/types/container/ContainerStartOptions
+// and for Singularity TBD.
+type ContainerStartOptions struct {
+ // FIXME: do we need this in this wrapping? since we only use it's zero value
+ // just to comply with Docker's ContainerStart API
+ // maybe not using it will be the best
+ CheckpointID string
+ CheckpointDir string
+// ContainerRemoveOptions in the case of docker will be similar to
+// github.com/docker/docker/api/types/container/ContainerRemoveOptions
+// and for Singularity TBD.
+type ContainerRemoveOptions struct {
+ // FIXME: we *only* call it with dockertypes.ContainerRemoveOptions{Force: true})
+ // may be should not be in this
+ Force bool
+// ContainerAttachOptions in the case of docker will be similar to
+// github.com/docker/docker/api/types/container/ContainerAttachOptions
+// and for Singularity TBD.
+type ContainerAttachOptions struct {
+ Stream bool
+ Stdin bool
+ Stdout bool
+ Stderr bool
+// ImageRemoveOptions in the case of docker will be similar to
+// github.com/docker/docker/api/types/container/ImageRemoveOptions
+// and for Singularity TBD.
+type ImageRemoveOptions struct {
+ Force bool
+ PruneChildren bool
+ //not used as far as I know
+// ContainerInspectResponse in the case of docker will be similar to
+// github.com/docker/docker/api/types/ContainerJSON
+// and for Singularity TBD.
+// MAYBE call it ExecRunnerContainer? since the struct is describing a container
+// from the underlying ExecRunner
+type ContainerInspectResponse struct {
+ //Important bits for us
+ // State = current checks: (nil, Running, Created)
+ State *ContainerState
+// ImageInspectResponse in the case of docker is similar to
+// github.com/docker/docker/api/types/ImageInspect
+// and for Singularity TBD.
+// MAYBE call it ExecRunnerImage? since the struct is describing an image
+// from the underlying ExecRunner
+type ImageInspectResponse struct {
+ // we don't use the respones but we use ImageInspectWithRaw(context.TODO(), imageID)
+ // to check if we already have the docker image, maybe we can do the
+ // a imagePresent(id string) (bool)
+ ID string
+// ImageLoadResponse returns information to the client about a load process.
+type ImageLoadResponse struct {
+ // Body must be closed to avoid a resource leak
+ Body io.ReadCloser
+ JSON bool
+// ImageDeleteResponseItem is a reply from ImageRemove.
+type ImageDeleteResponseItem struct {
+ // The image ID of an image that was deleted
+ Deleted string `json:"Deleted,omitempty"`
+ // The image ID of an image that was untagged
+ Untagged string `json:"Untagged,omitempty"`
+// ContainerState stores container's running state
+// it's part of ContainerJSONBase and will return by "inspect" command
+type ContainerState struct {
+ Status string // String representation of the container state. Can be one of "created", "running", "paused", "restarting", "removing", "exited", or "dead"
+ Running bool
+ Paused bool
+ Restarting bool
+ OOMKilled bool
+ Dead bool
+ Pid int
+ ExitCode int
+ Error string
+ StartedAt string
+ FinishedAt string
+// HijackedResponse holds connection information for a hijacked request.
+// HijackedResponse is needed as an artifact that comes from docker.
+// We need to figure out if this is the best abstraction at this level
+// for now this is a copy and paste from docker package. Might evolve later.
+type HijackedResponse struct {
+ Conn net.Conn
+ Reader *bufio.Reader
+// Close closes the hijacked connection and reader.
+func (h *HijackedResponse) Close() {
+ h.Conn.Close()
+// CloseWriter is an interface that implements structs
+// that close input streams to prevent from writing.
+type CloseWriter interface {
+ CloseWrite() error
+// CloseWrite closes a readWriter for writing.
+func (h *HijackedResponse) CloseWrite() error {
+ if conn, ok := h.Conn.(CloseWriter); ok {
+ return conn.CloseWrite()
+ }
+ return nil
+//------------ End of HijackedResponse
+// Similar to HijackedResponse, Waitcondtion is here and will decide later how is implemented in Singularity
+// WaitCondition is a type used to specify a container state for which
+// to wait.
+type WaitCondition string
+// Possible WaitCondition Values.
+// WaitConditionNotRunning (default) is used to wait for any of the non-running
+// states: "created", "exited", "dead", "removing", or "removed".
+// WaitConditionNextExit is used to wait for the next time the state changes
+// to a non-running state. If the state is currently "created" or "exited",
+// this would cause Wait() to block until either the container runs and exits
+// or is removed.
+// WaitConditionRemoved is used to wait for the container to be removed.
+const (
+ WaitConditionNotRunning WaitCondition = "not-running"
+ WaitConditionNextExit WaitCondition = "next-exit"
+ WaitConditionRemoved WaitCondition = "removed"
+//------------ End of WaitCondition
+// ContainerWaitOKBody
+/// yetanother copy, this time from from docker/api/types/container/container_wait.go.
+// That file is generated from swagger, so I don't think is a good idea to have it
+// here. but for now, I'll finish creating the abstraction layer
+// ContainerWaitOKBodyError container waiting error, if any
+// swagger:model ContainerWaitOKBodyError
+type ContainerWaitOKBodyError struct {
+ // Details of an error
+ Message string `json:"Message,omitempty"`
+// ContainerWaitOKBody OK response to ContainerWait operation
+// swagger:model ContainerWaitOKBody
+type ContainerWaitOKBody struct {
+ // error
+ // Required: true
+ Error *ContainerWaitOKBodyError `json:"Error"`
+ // Exit code of the container
+ // Required: true
+ StatusCode int64 `json:"StatusCode"`
+//------------ End of ContainerWaitOKBody
+// ThinContainerExecRunner is the "common denominator" interface for all ExecRunners
+// (either Docker or Singularity or more to come). For now is based in the
+// ThinDockerClient interface with our own objects (instead of the ones that come
+// from docker)
+type ThinContainerExecRunner interface {
+ GetContainerConfig() (ContainerConfig, error)
+ GetHostConfig() (HostConfig, error)
+ ContainerAttach(ctx context.Context, container string, options ContainerAttachOptions) (HijackedResponse, error)
+ ContainerCreate(ctx context.Context, config ContainerConfig, hostConfig HostConfig, networkingConfig *NetworkingConfig, containerName string) (ContainerCreateResponse, error)
+ ContainerStart(ctx context.Context, container string, options ContainerStartOptions) error
+ ContainerRemove(ctx context.Context, container string, options ContainerRemoveOptions) error
+ ContainerWait(ctx context.Context, container string, condition WaitCondition) (<-chan ContainerWaitOKBody, <-chan error)
+ ContainerInspect(ctx context.Context, id string) (ContainerInspectResponse, error)
+ ImageInspectWithRaw(ctx context.Context, image string) (ImageInspectResponse, []byte, error)
+ ImageLoad(ctx context.Context, input io.Reader, quiet bool) (ImageLoadResponse, error)
+ ImageRemove(ctx context.Context, image string, options ImageRemoveOptions) ([]ImageDeleteResponseItem, error)
diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go
index 969682f46..fb6aaee99 100644
--- a/lib/crunchrun/crunchrun.go
+++ b/lib/crunchrun/crunchrun.go
@@ -95,7 +95,12 @@ type PsProcess interface {
// ContainerRunner is the main stateful struct used for a single execution of a
// container.
type ContainerRunner struct {
- Docker ThinDockerClient
+ ContainerExecRunner ThinContainerExecRunner
+ //Docker ThinDockerClient
+ ContainerConfig dockercontainer.Config //FIXME: translate this to the ThinContainerRunner interface
+ HostConfig dockercontainer.HostConfig //FIXME: translate this to the ThinContainerRunner interface
+ //--------------
// Dispatcher client is initialized with the Dispatcher token.
// This is a privileged token used to manage container status
@@ -119,35 +124,33 @@ type ContainerRunner struct {
ContainerArvClient IArvadosClient
ContainerKeepClient IKeepClient
- Container arvados.Container
- ContainerConfig dockercontainer.Config
- HostConfig dockercontainer.HostConfig
- token string
- ContainerID string
- ExitCode *int
- NewLogWriter NewLogWriter
- loggingDone chan bool
- CrunchLog *ThrottledLogger
- Stdout io.WriteCloser
- Stderr io.WriteCloser
- logUUID string
- logMtx sync.Mutex
- LogCollection arvados.CollectionFileSystem
- LogsPDH *string
- RunArvMount RunArvMount
- MkTempDir MkTempDir
- ArvMount *exec.Cmd
- ArvMountPoint string
- HostOutputDir string
- Binds []string
- Volumes map[string]struct{}
- OutputPDH *string
- SigChan chan os.Signal
- ArvMountExit chan error
- SecretMounts map[string]arvados.Mount
- MkArvClient func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error)
- finalState string
- parentTemp string
+ Container arvados.Container
+ token string
+ ContainerID string
+ ExitCode *int
+ NewLogWriter NewLogWriter
+ loggingDone chan bool
+ CrunchLog *ThrottledLogger
+ Stdout io.WriteCloser
+ Stderr io.WriteCloser
+ logUUID string
+ logMtx sync.Mutex
+ LogCollection arvados.CollectionFileSystem
+ LogsPDH *string
+ RunArvMount RunArvMount
+ MkTempDir MkTempDir
+ ArvMount *exec.Cmd
+ ArvMountPoint string
+ HostOutputDir string
+ Binds []string
+ Volumes map[string]struct{}
+ OutputPDH *string
+ SigChan chan os.Signal
+ ArvMountExit chan error
+ SecretMounts map[string]arvados.Mount
+ MkArvClient func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error)
+ finalState string
+ parentTemp string
statLogger io.WriteCloser
statReporter *crunchstat.Reporter
@@ -209,7 +212,7 @@ func (runner *ContainerRunner) stop(sig os.Signal) {
runner.cCancelled = true
runner.CrunchLog.Printf("removing container")
- err := runner.Docker.ContainerRemove(context.TODO(), runner.ContainerID, dockertypes.ContainerRemoveOptions{Force: true})
+ err := runner.ContainerExecRunner.ContainerRemove(context.TODO(), runner.ContainerID, ContainerRemoveOptions{Force: true})
if err != nil {
runner.CrunchLog.Printf("error removing container: %s", err)
@@ -283,7 +286,7 @@ func (runner *ContainerRunner) LoadImage() (err error) {
runner.CrunchLog.Printf("Using Docker image id '%s'", imageID)
- _, _, err = runner.Docker.ImageInspectWithRaw(context.TODO(), imageID)
+ _, _, err = runner.ContainerExecRunner.ImageInspectWithRaw(context.TODO(), imageID)
if err != nil {
runner.CrunchLog.Print("Loading Docker image from keep")
@@ -293,7 +296,7 @@ func (runner *ContainerRunner) LoadImage() (err error) {
return fmt.Errorf("While creating ManifestFileReader for container image: %v", err)
- response, err := runner.Docker.ImageLoad(context.TODO(), readCloser, true)
+ response, err := runner.ContainerExecRunner.ImageLoad(context.TODO(), readCloser, true)
if err != nil {
return fmt.Errorf("While loading container image into Docker: %v", err)
@@ -976,8 +979,8 @@ func (runner *ContainerRunner) AttachStreams() (err error) {
stdinUsed := stdinRdr != nil || len(stdinJSON) != 0
- response, err := runner.Docker.ContainerAttach(context.TODO(), runner.ContainerID,
- dockertypes.ContainerAttachOptions{Stream: true, Stdin: stdinUsed, Stdout: true, Stderr: true})
+ response, err := runner.ContainerExecRunner.ContainerAttach(context.TODO(), runner.ContainerID,
+ ContainerAttachOptions{Stream: true, Stdin: stdinUsed, Stdout: true, Stderr: true})
if err != nil {
return fmt.Errorf("While attaching container stdout/stderr streams: %v", err)
@@ -1063,16 +1066,19 @@ func (runner *ContainerRunner) getStdoutFile(mntPath string) (*os.File, error) {
func (runner *ContainerRunner) CreateContainer() error {
runner.CrunchLog.Print("Creating Docker container")
- runner.ContainerConfig.Cmd = runner.Container.Command
+ containerConfig, err := runner.ContainerExecRunner.GetContainerConfig()
+ hostConfig, err := runner.ContainerExecRunner.GetHostConfig()
+ containerConfig.Cmd = runner.Container.Command
if runner.Container.Cwd != "." {
- runner.ContainerConfig.WorkingDir = runner.Container.Cwd
+ containerConfig.WorkingDir = runner.Container.Cwd
for k, v := range runner.Container.Environment {
- runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v)
+ containerConfig.Env = append(containerConfig.Env, k+"="+v)
- runner.ContainerConfig.Volumes = runner.Volumes
+ containerConfig.Volumes = runner.Volumes
maxRAM := int64(runner.Container.RuntimeConstraints.RAM)
minDockerRAM := int64(16)
@@ -1099,7 +1105,7 @@ func (runner *ContainerRunner) CreateContainer() error {
if err != nil {
return err
- runner.ContainerConfig.Env = append(runner.ContainerConfig.Env,
+ containerConfig.Env = append(containerConfig.Env,
@@ -1114,13 +1120,13 @@ func (runner *ContainerRunner) CreateContainer() error {
_, stdinUsed := runner.Container.Mounts["stdin"]
- runner.ContainerConfig.OpenStdin = stdinUsed
- runner.ContainerConfig.StdinOnce = stdinUsed
- runner.ContainerConfig.AttachStdin = stdinUsed
- runner.ContainerConfig.AttachStdout = true
- runner.ContainerConfig.AttachStderr = true
+ containerConfig.OpenStdin = stdinUsed
+ containerConfig.StdinOnce = stdinUsed
+ containerConfig.AttachStdin = stdinUsed
+ containerConfig.AttachStdout = true
+ containerConfig.AttachStderr = true
- createdBody, err := runner.Docker.ContainerCreate(context.TODO(), &runner.ContainerConfig, &runner.HostConfig, nil, runner.Container.UUID)
+ createdBody, err := runner.ContainerExecRunner.ContainerCreate(context.TODO(), containerConfig, hostConfig, nil, runner.Container.UUID)
if err != nil {
return fmt.Errorf("While creating container: %v", err)
@@ -1138,8 +1144,8 @@ func (runner *ContainerRunner) StartContainer() error {
if runner.cCancelled {
return ErrCancelled
- err := runner.Docker.ContainerStart(context.TODO(), runner.ContainerID,
- dockertypes.ContainerStartOptions{})
+ err := runner.ContainerExecRunner.ContainerStart(context.TODO(), runner.ContainerID,
+ ContainerStartOptions{})
if err != nil {
var advice string
if m, e := regexp.MatchString("(?ms).*(exec|System error).*(no such file or directory|file not found).*", err.Error()); m && e == nil {
@@ -1156,7 +1162,7 @@ func (runner *ContainerRunner) WaitFinish() error {
var runTimeExceeded <-chan time.Time
runner.CrunchLog.Print("Waiting for container to finish")
- waitOk, waitErr := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID, dockercontainer.WaitConditionNotRunning)
+ waitOk, waitErr := runner.ContainerExecRunner.ContainerWait(context.TODO(), runner.ContainerID, WaitConditionNotRunning)
arvMountExit := runner.ArvMountExit
if timeout := runner.Container.SchedulingParameters.MaxRunTime; timeout > 0 {
runTimeExceeded = time.After(time.Duration(timeout) * time.Second)
@@ -1170,7 +1176,7 @@ func (runner *ContainerRunner) WaitFinish() error {
for range time.NewTicker(runner.containerWatchdogInterval).C {
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(runner.containerWatchdogInterval))
- ctr, err := runner.Docker.ContainerInspect(ctx, runner.ContainerID)
+ ctr, err := runner.ContainerExecRunner.ContainerInspect(ctx, runner.ContainerID)
done := runner.cRemoved || runner.ExitCode != nil
@@ -1727,14 +1733,14 @@ func (runner *ContainerRunner) fetchContainerRecord() error {
func NewContainerRunner(dispatcherClient *arvados.Client,
dispatcherArvClient IArvadosClient,
dispatcherKeepClient IKeepClient,
- docker ThinDockerClient,
+ containerRunner ThinContainerExecRunner,
containerUUID string) (*ContainerRunner, error) {
cr := &ContainerRunner{
dispatcherClient: dispatcherClient,
DispatcherArvClient: dispatcherArvClient,
DispatcherKeepClient: dispatcherKeepClient,
- Docker: docker,
+ ContainerExecRunner: containerRunner,
cr.NewLogWriter = cr.NewArvLogWriter
cr.RunArvMount = cr.ArvMountCmd
@@ -1794,7 +1800,9 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
memprofile := flags.String("memprofile", "", "write memory profile to `file` after running container")
flags.Duration("check-containerd", 0, "Ignored. Exists for compatibility with older versions.")
+ containerRunner := flags.String("container-runner", "docker",
+ `Specify the container runner. available options: docker, singularity.
+ `)
ignoreDetachFlag := false
if len(args) > 0 && args[0] == "-no-detach" {
// This process was invoked by a parent process, which
@@ -1863,22 +1871,41 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2}
kc.Retries = 4
- // API version 1.21 corresponds to Docker 1.9, which is currently the
- // minimum version we want to support.
- docker, dockererr := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
+ var cr *ContainerRunner
+ if *containerRunner == "docker" {
+ // API version 1.21 corresponds to Docker 1.9, which is currently the
+ // minimum version we want to support.
+ docker, dockererr := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
- cr, err := NewContainerRunner(arvados.NewClientFromEnv(), api, kc, docker, containerID)
- if err != nil {
- log.Print(err)
- return 1
- }
- if dockererr != nil {
- cr.CrunchLog.Printf("%s: %v", containerID, dockererr)
- cr.checkBrokenNode(dockererr)
- cr.CrunchLog.Close()
- return 1
- }
+ cr, err = NewContainerRunner(arvados.NewClientFromEnv(), api, kc, adapter(docker), containerID)
+ if err != nil {
+ log.Print(err)
+ return 1
+ }
+ if dockererr != nil {
+ cr.CrunchLog.Printf("%s: %v", containerID, dockererr)
+ cr.checkBrokenNode(dockererr)
+ cr.CrunchLog.Close()
+ return 1
+ }
+ } else {
+ // Singularity
+ singularity, singularityerr := NewSingularityClient()
+ cr, err = NewContainerRunner(arvados.NewClientFromEnv(), api, kc, singularity, containerID)
+ if err != nil {
+ log.Print(err)
+ return 1
+ }
+ if singularityerr != nil {
+ cr.CrunchLog.Printf("%s: %v", containerID, singularityerr)
+ //cr.checkBrokenNode(singularityrerr) //
+ cr.CrunchLog.Close()
+ return 1
+ }
+ }
cr.gateway = Gateway{
Address: os.Getenv("GatewayAddress"),
AuthSecret: os.Getenv("GatewayAuthSecret"),
diff --git a/lib/crunchrun/crunchrun_test.go b/lib/crunchrun/crunchrun_test.go
index dbdaa6293..ddf40543e 100644
--- a/lib/crunchrun/crunchrun_test.go
+++ b/lib/crunchrun/crunchrun_test.go
@@ -439,7 +439,7 @@ func (client *KeepTestClient) ManifestFileReader(m manifest.Manifest, filename s
func (s *TestSuite) TestLoadImage(c *C) {
cr, err := NewContainerRunner(s.client, &ArvTestClient{},
- &KeepTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ &KeepTestClient{}, adapter(s.docker), "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
c.Assert(err, IsNil)
kc := &KeepTestClient{}
@@ -447,10 +447,10 @@ func (s *TestSuite) TestLoadImage(c *C) {
cr.ContainerArvClient = &ArvTestClient{}
cr.ContainerKeepClient = kc
- _, err = cr.Docker.ImageRemove(nil, hwImageID, dockertypes.ImageRemoveOptions{})
+ _, err = cr.ContainerExecRunner.ImageRemove(nil, hwImageID, ImageRemoveOptions{})
c.Check(err, IsNil)
- _, _, err = cr.Docker.ImageInspectWithRaw(nil, hwImageID)
+ _, _, err = cr.ContainerExecRunner.ImageInspectWithRaw(nil, hwImageID)
c.Check(err, NotNil)
cr.Container.ContainerImage = hwPDH
@@ -463,13 +463,13 @@ func (s *TestSuite) TestLoadImage(c *C) {
c.Check(err, IsNil)
defer func() {
- cr.Docker.ImageRemove(nil, hwImageID, dockertypes.ImageRemoveOptions{})
+ cr.ContainerExecRunner.ImageRemove(nil, hwImageID, ImageRemoveOptions{})
c.Check(kc.Called, Equals, true)
c.Check(cr.ContainerConfig.Image, Equals, hwImageID)
- _, _, err = cr.Docker.ImageInspectWithRaw(nil, hwImageID)
+ _, _, err = cr.ContainerExecRunner.ImageInspectWithRaw(nil, hwImageID)
c.Check(err, IsNil)
// (2) Test using image that's already loaded
@@ -574,7 +574,7 @@ func (s *TestSuite) TestLoadImageArvError(c *C) {
func (s *TestSuite) TestLoadImageKeepError(c *C) {
// (2) Keep error
kc := &KeepErrorTestClient{}
- cr, err := NewContainerRunner(s.client, &ArvTestClient{}, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ cr, err := NewContainerRunner(s.client, &ArvTestClient{}, kc, adapter(s.docker), "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
c.Assert(err, IsNil)
cr.ContainerArvClient = &ArvTestClient{}
@@ -604,7 +604,7 @@ func (s *TestSuite) TestLoadImageCollectionError(c *C) {
func (s *TestSuite) TestLoadImageKeepReadError(c *C) {
// (4) Collection doesn't contain image
kc := &KeepReadErrorTestClient{}
- cr, err := NewContainerRunner(s.client, &ArvTestClient{}, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ cr, err := NewContainerRunner(s.client, &ArvTestClient{}, kc, adapter(s.docker), "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
c.Assert(err, IsNil)
cr.Container.ContainerImage = hwPDH
cr.ContainerArvClient = &ArvTestClient{}
@@ -653,7 +653,7 @@ func (s *TestSuite) TestRunContainer(c *C) {
kc := &KeepTestClient{}
defer kc.Close()
- cr, err := NewContainerRunner(s.client, &ArvTestClient{}, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ cr, err := NewContainerRunner(s.client, &ArvTestClient{}, kc, adapter(s.docker), "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
c.Assert(err, IsNil)
cr.ContainerArvClient = &ArvTestClient{}
@@ -777,7 +777,7 @@ func (s *TestSuite) fullRunHelper(c *C, record string, extraMounts []string, exi
s.docker.api = api
kc := &KeepTestClient{}
defer kc.Close()
- cr, err = NewContainerRunner(s.client, api, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ cr, err = NewContainerRunner(s.client, api, kc, adapter(s.docker), "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
c.Assert(err, IsNil)
s.runner = cr
cr.statInterval = 100 * time.Millisecond
@@ -1136,7 +1136,7 @@ func (s *TestSuite) testStopContainer(c *C, setup func(cr *ContainerRunner)) {
api := &ArvTestClient{Container: rec}
kc := &KeepTestClient{}
defer kc.Close()
- cr, err := NewContainerRunner(s.client, api, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ cr, err := NewContainerRunner(s.client, api, kc, adapter(s.docker), "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
c.Assert(err, IsNil)
cr.RunArvMount = func([]string, string) (*exec.Cmd, error) { return nil, nil }
cr.MkArvClient = func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) {
@@ -1621,7 +1621,7 @@ func (s *TestSuite) stdoutErrorRunHelper(c *C, record string, fn func(t *TestDoc
api = &ArvTestClient{Container: rec}
kc := &KeepTestClient{}
defer kc.Close()
- cr, err = NewContainerRunner(s.client, api, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ cr, err = NewContainerRunner(s.client, api, kc, adapter(s.docker), "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
c.Assert(err, IsNil)
am := &ArvMountCmdLine{}
cr.RunArvMount = am.ArvMountTest
diff --git a/lib/crunchrun/docker_adapter.go b/lib/crunchrun/docker_adapter.go
new file mode 100644
index 000000000..795d06b56
--- /dev/null
+++ b/lib/crunchrun/docker_adapter.go
@@ -0,0 +1,219 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+// SPDX-License-Identifier: AGPL-3.0
+package crunchrun
+import (
+ "io"
+ dockertypes "github.com/docker/docker/api/types"
+ dockercontainer "github.com/docker/docker/api/types/container"
+ dockernetwork "github.com/docker/docker/api/types/network"
+ "golang.org/x/net/context"
+type DockerAdapter struct {
+ docker ThinDockerClient
+ containerConfig ContainerConfig
+ hostConfig HostConfig
+func (a *DockerAdapter) ContainerAttach(ctx context.Context, container string, options ContainerAttachOptions) (HijackedResponse, error) {
+ dockerOptions := dockertypes.ContainerAttachOptions{
+ Stream: options.Stream,
+ Stdin: options.Stdin,
+ Stdout: options.Stdout,
+ Stderr: options.Stderr}
+ dockerResponse, docker_err := a.docker.ContainerAttach(ctx, container, dockerOptions)
+ adapterResponse := HijackedResponse{
+ Conn: dockerResponse.Conn,
+ Reader: dockerResponse.Reader,
+ }
+ return adapterResponse, docker_err
+func (a *DockerAdapter) ContainerCreate(ctx context.Context, config ContainerConfig, hostConfig HostConfig, networkingConfig *NetworkingConfig, containerName string) (ContainerCreateResponse, error) {
+ var dockerEndpointsConfig map[string]*dockernetwork.EndpointSettings
+ var dockerNetworkConfig *dockernetwork.NetworkingConfig
+ if networkingConfig != nil {
+ for k, v := range networkingConfig.EndpointsConfig {
+ dockerIpamConfig := &dockernetwork.EndpointIPAMConfig{
+ IPv4Address: v.IPAMConfig.IPv4Address,
+ IPv6Address: v.IPAMConfig.IPv6Address,
+ LinkLocalIPs: v.IPAMConfig.LinkLocalIPs,
+ }
+ dockerEndpointsConfig[k] = &dockernetwork.EndpointSettings{
+ IPAMConfig: dockerIpamConfig,
+ Links: v.Links,
+ Aliases: v.Aliases,
+ NetworkID: v.NetworkID,
+ EndpointID: v.EndpointID,
+ Gateway: v.Gateway,
+ IPAddress: v.IPAddress,
+ IPPrefixLen: v.IPPrefixLen,
+ IPv6Gateway: v.IPv6Gateway,
+ GlobalIPv6Address: v.GlobalIPv6Address,
+ GlobalIPv6PrefixLen: v.GlobalIPv6PrefixLen,
+ MacAddress: v.MacAddress,
+ DriverOpts: v.DriverOpts,
+ }
+ }
+ dockerNetworkConfig = &dockernetwork.NetworkingConfig{
+ EndpointsConfig: dockerEndpointsConfig,
+ }
+ }
+ dockerConfig := dockercontainer.Config{
+ OpenStdin: config.OpenStdin,
+ StdinOnce: config.StdinOnce,
+ AttachStdin: config.AttachStdin,
+ AttachStdout: config.AttachStdout,
+ AttachStderr: config.AttachStderr,
+ Cmd: config.Cmd,
+ WorkingDir: config.WorkingDir,
+ Env: config.Env,
+ Volumes: config.Volumes,
+ }
+ dockerHostConfig := dockercontainer.HostConfig{}
+ dockerResponse, dockerErr := a.docker.ContainerCreate(ctx,
+ &dockerConfig,
+ &dockerHostConfig, dockerNetworkConfig, containerName)
+ adapterResponse := ContainerCreateResponse{
+ ID: dockerResponse.ID,
+ Warnings: dockerResponse.Warnings,
+ }
+ return adapterResponse, dockerErr
+func (a *DockerAdapter) ContainerStart(ctx context.Context, container string, options ContainerStartOptions) error {
+ dockerContainerStartOptions := dockertypes.ContainerStartOptions{
+ CheckpointID: options.CheckpointID,
+ CheckpointDir: options.CheckpointDir,
+ }
+ dockerErr := a.docker.ContainerStart(ctx, container, dockerContainerStartOptions)
+ return dockerErr
+func (a *DockerAdapter) ContainerRemove(ctx context.Context, container string, options ContainerRemoveOptions) error {
+ dockerContainerRemoveOptions := dockertypes.ContainerRemoveOptions{Force: options.Force}
+ dockerErr := a.docker.ContainerRemove(ctx, container, dockerContainerRemoveOptions)
+ return dockerErr
+func (a *DockerAdapter) ContainerInspect(ctx context.Context, id string) (ContainerInspectResponse, error) {
+ dockerContainerInspectResponse, dockerErr := a.docker.ContainerInspect(ctx, id)
+ containerState := &ContainerState{
+ Running: dockerContainerInspectResponse.State.Running,
+ Paused: dockerContainerInspectResponse.State.Paused,
+ Restarting: dockerContainerInspectResponse.State.Restarting,
+ OOMKilled: dockerContainerInspectResponse.State.OOMKilled,
+ Dead: dockerContainerInspectResponse.State.Dead,
+ Pid: dockerContainerInspectResponse.State.Pid,
+ ExitCode: dockerContainerInspectResponse.State.ExitCode,
+ Error: dockerContainerInspectResponse.State.Error,
+ StartedAt: dockerContainerInspectResponse.State.StartedAt,
+ FinishedAt: dockerContainerInspectResponse.State.FinishedAt,
+ }
+ adapterResponse := &ContainerInspectResponse{State: containerState}
+ return *adapterResponse, dockerErr
+func (a *DockerAdapter) ContainerWait(ctx context.Context, container string, condition WaitCondition) (<-chan ContainerWaitOKBody, <-chan error) {
+ //var dockercontainerCondition dockercontainer.WaitCondition =
+ dockercontainerCondition := dockercontainer.WaitCondition(condition)
+ dockerContainerWaitOKBody, dockerErr := a.docker.ContainerWait(ctx, container, dockercontainerCondition)
+ // translate from <-chan dockercontainer.ContainerWaitOKBody to <-chan ContainerWaitOKBody,
+ adapterContainerWaitOKBody := make(chan ContainerWaitOKBody)
+ go func() {
+ for dockerMsg := range dockerContainerWaitOKBody {
+ var adapterBodyMsg *ContainerWaitOKBody
+ var adapterError *ContainerWaitOKBodyError
+ if dockerMsg.Error != nil {
+ adapterError = &ContainerWaitOKBodyError{
+ Message: dockerMsg.Error.Message,
+ }
+ }
+ adapterBodyMsg = &ContainerWaitOKBody{
+ Error: adapterError,
+ StatusCode: dockerMsg.StatusCode,
+ }
+ adapterContainerWaitOKBody <- *adapterBodyMsg
+ }
+ }()
+ return adapterContainerWaitOKBody, dockerErr
+func (a *DockerAdapter) ImageInspectWithRaw(ctx context.Context, image string) (ImageInspectResponse, []byte, error) {
+ dockerImageInspectResponse, rawBytes, dockerErr := a.docker.ImageInspectWithRaw(ctx, image)
+ adapterImageInspectResponse := &ImageInspectResponse{
+ ID: dockerImageInspectResponse.ID,
+ }
+ return *adapterImageInspectResponse, rawBytes, dockerErr
+func (a *DockerAdapter) ImageLoad(ctx context.Context, input io.Reader, quiet bool) (ImageLoadResponse, error) {
+ dockerImageLoadResponse, dockerErr := a.docker.ImageLoad(ctx, input, quiet)
+ adapterImageLoadResponse := &ImageLoadResponse{
+ Body: dockerImageLoadResponse.Body,
+ JSON: dockerImageLoadResponse.JSON,
+ }
+ return *adapterImageLoadResponse, dockerErr
+func (a *DockerAdapter) ImageRemove(ctx context.Context, image string, options ImageRemoveOptions) ([]ImageDeleteResponseItem, error) {
+ dockerOptions := &dockertypes.ImageRemoveOptions{
+ Force: options.Force,
+ PruneChildren: options.PruneChildren,
+ }
+ dockerImageDeleteResponseItems, dockerErr := a.docker.ImageRemove(ctx, image, *dockerOptions)
+ var adapterResponses []ImageDeleteResponseItem
+ for _, dockerResponse := range dockerImageDeleteResponseItems {
+ adapterResponse := &ImageDeleteResponseItem{
+ Deleted: dockerResponse.Deleted,
+ Untagged: dockerResponse.Untagged,
+ }
+ adapterResponses = append(adapterResponses, *adapterResponse)
+ }
+ return adapterResponses, dockerErr
+func (a *DockerAdapter) GetContainerConfig() (ContainerConfig, error) {
+ return a.containerConfig, nil
+func (a *DockerAdapter) GetHostConfig() (HostConfig, error) {
+ return a.hostConfig, nil
+func adapter(docker ThinDockerClient) ThinContainerExecRunner {
+ return_object := &DockerAdapter{docker: docker}
+ return return_object
diff --git a/lib/crunchrun/singularity.go b/lib/crunchrun/singularity.go
new file mode 100644
index 000000000..91ec0ea11
--- /dev/null
+++ b/lib/crunchrun/singularity.go
@@ -0,0 +1,86 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+// SPDX-License-Identifier: AGPL-3.0
+package crunchrun
+import (
+ "fmt"
+ "io"
+ "golang.org/x/net/context"
+type SingularityClient struct {
+ containerConfig ContainerConfig
+ hostConfig HostConfig
+func (c SingularityClient) GetContainerConfig() (ContainerConfig, error) {
+ return c.containerConfig, nil
+func (c SingularityClient) GetHostConfig() (HostConfig, error) {
+ return c.hostConfig, nil
+func (c SingularityClient) ContainerAttach(ctx context.Context, container string, options ContainerAttachOptions) (HijackedResponse, error) {
+ fmt.Printf("placeholder for container ContainerAttach %s", container)
+ return HijackedResponse{}, nil
+func (c SingularityClient) ContainerCreate(ctx context.Context, config ContainerConfig, hostConfig HostConfig, networkingConfig *NetworkingConfig, containerName string) (ContainerCreateResponse, error) {
+ fmt.Printf("placeholder for container ContainerCreate %s", containerName)
+ return ContainerCreateResponse{}, nil
+func (c SingularityClient) ContainerStart(ctx context.Context, container string, options ContainerStartOptions) error {
+ fmt.Printf("placeholder for container ContainerStart %s", container)
+ return nil
+func (c SingularityClient) ContainerRemove(ctx context.Context, container string, options ContainerRemoveOptions) error {
+ fmt.Printf("placeholder for container ContainerRemove %s", container)
+ return nil
+func (c SingularityClient) ContainerWait(ctx context.Context, container string, condition WaitCondition) (<-chan ContainerWaitOKBody, <-chan error) {
+ fmt.Printf("placeholder for ContainerWait")
+ chanC := make(chan ContainerWaitOKBody)
+ chanE := make(chan error)
+ return chanC, chanE
+func (c SingularityClient) ContainerInspect(ctx context.Context, id string) (ContainerInspectResponse, error) {
+ fmt.Printf("placeholder for container ContainerInspect %s", id)
+ return ContainerInspectResponse{}, nil
+func (c SingularityClient) ImageInspectWithRaw(ctx context.Context, image string) (ImageInspectResponse, []byte, error) {
+ fmt.Printf("placeholder for ImageInspectWithRaw() %s", image)
+ return ImageInspectResponse{}, []byte(""), nil
+func (c SingularityClient) ImageLoad(ctx context.Context, input io.Reader, quiet bool) (ImageLoadResponse, error) {
+ fmt.Printf("placeholder for ImageLoad")
+ return ImageLoadResponse{}, nil
+func (c SingularityClient) ImageRemove(ctx context.Context, image string, options ImageRemoveOptions) ([]ImageDeleteResponseItem, error) {
+ fmt.Printf("placeholder for ImageRemove")
+ var responses []ImageDeleteResponseItem
+ tmp := ImageDeleteResponseItem{}
+ responses = append(responses, tmp)
+ return responses, nil
+func NewSingularityClient() (SingularityClient, error) {
+ var s = &SingularityClient{}
+ return *s, nil
More information about the arvados-commits
mailing list