[ARVADOS] created: 2.1.0-486-gc1d1f0502

Git user git at public.arvados.org
Mon Mar 15 20:44:02 UTC 2021


        at  c1d1f0502b8a0f049dba41da2f6b19a0d4b03d77 (commit)


commit c1d1f0502b8a0f049dba41da2f6b19a0d4b03d77
Author: Nico Cesar <nico at nicocesar.com>
Date:   Mon Mar 15 16:42:26 2021 -0400

    First step in a 1-to-1 abstraction with docker
    
    Arvados-DCO-1.1-Signed-off-by: Nico Cesar <nico at curii.com>

diff --git a/lib/crunchrun/container_exec_types.go b/lib/crunchrun/container_exec_types.go
new file mode 100644
index 000000000..cc78a29df
--- /dev/null
+++ b/lib/crunchrun/container_exec_types.go
@@ -0,0 +1,294 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package crunchrun
+
+import (
+	"bufio"
+	"io"
+	"net"
+
+	"golang.org/x/net/context"
+)
+
+// ContainerConfig holds all values needed for Docker and Singularity
+// to run a container. In the case of docker is similar to
+// github.com/docker/docker/api/types/container/Config
+// see https://github.com/moby/moby/blob/master/api/types/container/config.go
+// "It should hold only portable information about the container."
+// and for Singularity TBD
+type ContainerConfig struct {
+	OpenStdin    bool
+	StdinOnce    bool
+	AttachStdin  bool
+	AttachStdout bool
+	AttachStderr bool
+
+	Cmd        []string
+	WorkingDir string
+	Env        []string
+	Volumes    map[string]struct{}
+}
+
+// HostConfig holds all values needed for Docker and Singularity
+// to run a container related to the host. In the case of docker is
+// similar to github.com/docker/docker/api/types/container/HostConfig
+// see https://github.com/moby/moby/blob/master/api/types/container/host_config.go
+// "dependent of the host we are running on".
+// and for Singularity TBD
+type HostConfig struct {
+	//important bits:
+	// - Binds:
+	// LogConfig
+	// Resources: see dockercontainer.Resources
+	// NetworkMode: see dockercontainer.NetworkMode
+}
+
+// ---- NETROWKING STUFF
+// EndpointIPAMConfig represents IPAM configurations for the endpoint
+type EndpointIPAMConfig struct {
+	IPv4Address  string   `json:",omitempty"`
+	IPv6Address  string   `json:",omitempty"`
+	LinkLocalIPs []string `json:",omitempty"`
+}
+
+// EndpointSettings stores the network endpoint details
+type EndpointSettings struct {
+	// Configurations
+	IPAMConfig *EndpointIPAMConfig
+	Links      []string
+	Aliases    []string
+	// Operational data
+	NetworkID           string
+	EndpointID          string
+	Gateway             string
+	IPAddress           string
+	IPPrefixLen         int
+	IPv6Gateway         string
+	GlobalIPv6Address   string
+	GlobalIPv6PrefixLen int
+	MacAddress          string
+	DriverOpts          map[string]string
+}
+
+// ----
+// NetworkingConfig holds all values needed for Docker and Singularity
+// related network. In the case of docker is similar to
+// github.com/docker/docker/api/types/network/NetworkingConfig
+// and for Singularity TBD.
+type NetworkingConfig struct {
+	EndpointsConfig map[string]*EndpointSettings
+}
+
+// ContainerCreateResponse in the case of docker will be similar to
+// github.com/docker/docker/api/types/container/ContainerCreateCreatedBody
+// and for Singularity TBD.
+type ContainerCreateResponse struct {
+	// The ID of the created container
+	// Required: true
+	ID string
+	// Warnings encountered when creating the container
+	// Required: true
+	Warnings []string
+}
+
+// ContainerStartOptions in the case of docker will be similar to
+// github.com/docker/docker/api/types/container/ContainerStartOptions
+// and for Singularity TBD.
+type ContainerStartOptions struct {
+	// FIXME: do we need this in this wrapping? since we only use it's zero value
+	// just to comply with Docker's ContainerStart API
+	// maybe not using it will be the best
+	CheckpointID  string
+	CheckpointDir string
+}
+
+// ContainerRemoveOptions in the case of docker will be similar to
+// github.com/docker/docker/api/types/container/ContainerRemoveOptions
+// and for Singularity TBD.
+type ContainerRemoveOptions struct {
+	// FIXME: we *only* call it with dockertypes.ContainerRemoveOptions{Force: true})
+	// may be should not be in this
+	Force bool
+}
+
+// ContainerAttachOptions in the case of docker will be similar to
+// github.com/docker/docker/api/types/container/ContainerAttachOptions
+// and for Singularity TBD.
+type ContainerAttachOptions struct {
+	Stream bool
+	Stdin  bool
+	Stdout bool
+	Stderr bool
+}
+
+// ImageRemoveOptions in the case of docker will be similar to
+// github.com/docker/docker/api/types/container/ImageRemoveOptions
+// and for Singularity TBD.
+type ImageRemoveOptions struct {
+	Force         bool
+	PruneChildren bool
+	//not used as far as I know
+}
+
+// ContainerInspectResponse in the case of docker will be similar to
+// github.com/docker/docker/api/types/ContainerJSON
+// and for Singularity TBD.
+// MAYBE call it ExecRunnerContainer? since the struct is describing  a container
+// from the underlying ExecRunner
+type ContainerInspectResponse struct {
+	//Important bits for us
+	// State = current checks: (nil, Running, Created)
+	State *ContainerState
+}
+
+// ImageInspectResponse  in the case of docker is similar to
+//  github.com/docker/docker/api/types/ImageInspect
+// and for Singularity TBD.
+// MAYBE call it ExecRunnerImage? since the struct is describing an image
+// from the underlying ExecRunner
+type ImageInspectResponse struct {
+	// we don't use the respones but we use ImageInspectWithRaw(context.TODO(), imageID)
+	// to check if we already have the docker image, maybe we can do the
+	// a imagePresent(id string) (bool)
+	ID string
+}
+
+// ImageLoadResponse returns information to the client about a load process.
+type ImageLoadResponse struct {
+	// Body must be closed to avoid a resource leak
+	Body io.ReadCloser
+	JSON bool
+}
+
+// ImageDeleteResponseItem is a reply from ImageRemove.
+type ImageDeleteResponseItem struct {
+
+	// The image ID of an image that was deleted
+	Deleted string `json:"Deleted,omitempty"`
+
+	// The image ID of an image that was untagged
+	Untagged string `json:"Untagged,omitempty"`
+}
+
+// ContainerState stores container's running state
+// it's part of ContainerJSONBase and will return by "inspect" command
+type ContainerState struct {
+	Status     string // String representation of the container state. Can be one of "created", "running", "paused", "restarting", "removing", "exited", or "dead"
+	Running    bool
+	Paused     bool
+	Restarting bool
+	OOMKilled  bool
+	Dead       bool
+	Pid        int
+	ExitCode   int
+	Error      string
+	StartedAt  string
+	FinishedAt string
+}
+
+// HijackedResponse holds connection information for a hijacked request.
+
+// HijackedResponse is needed as an artifact that comes from docker.
+// We need to figure out if this is the best abstraction at this level
+// for now this is a copy and paste from docker package. Might evolve later.
+type HijackedResponse struct {
+	Conn   net.Conn
+	Reader *bufio.Reader
+}
+
+// Close closes the hijacked connection and reader.
+func (h *HijackedResponse) Close() {
+	h.Conn.Close()
+}
+
+// CloseWriter is an interface that implements structs
+// that close input streams to prevent from writing.
+type CloseWriter interface {
+	CloseWrite() error
+}
+
+// CloseWrite closes a readWriter for writing.
+func (h *HijackedResponse) CloseWrite() error {
+	if conn, ok := h.Conn.(CloseWriter); ok {
+		return conn.CloseWrite()
+	}
+	return nil
+}
+
+//------------ End of HijackedResponse
+
+// Similar to HijackedResponse, Waitcondtion is here and will decide later how is implemented in Singularity
+
+// WaitCondition is a type used to specify a container state for which
+// to wait.
+type WaitCondition string
+
+// Possible WaitCondition Values.
+//
+// WaitConditionNotRunning (default) is used to wait for any of the non-running
+// states: "created", "exited", "dead", "removing", or "removed".
+//
+// WaitConditionNextExit is used to wait for the next time the state changes
+// to a non-running state. If the state is currently "created" or "exited",
+// this would cause Wait() to block until either the container runs and exits
+// or is removed.
+//
+// WaitConditionRemoved is used to wait for the container to be removed.
+const (
+	WaitConditionNotRunning WaitCondition = "not-running"
+	WaitConditionNextExit   WaitCondition = "next-exit"
+	WaitConditionRemoved    WaitCondition = "removed"
+)
+
+//------------ End of WaitCondition
+
+// ContainerWaitOKBody
+/// yetanother copy, this time from  from docker/api/types/container/container_wait.go.
+// That file is generated from swagger, so I don't think is a good idea to have it
+// here. but for now, I'll finish creating the abstraction layer
+
+// ContainerWaitOKBodyError container waiting error, if any
+// swagger:model ContainerWaitOKBodyError
+type ContainerWaitOKBodyError struct {
+
+	// Details of an error
+	Message string `json:"Message,omitempty"`
+}
+
+// ContainerWaitOKBody OK response to ContainerWait operation
+// swagger:model ContainerWaitOKBody
+type ContainerWaitOKBody struct {
+
+	// error
+	// Required: true
+	Error *ContainerWaitOKBodyError `json:"Error"`
+
+	// Exit code of the container
+	// Required: true
+	StatusCode int64 `json:"StatusCode"`
+}
+
+//------------ End of ContainerWaitOKBody
+
+// ThinContainerExecRunner is the "common denominator" interface for all ExecRunners
+// (either Docker or Singularity or more to come). For now is based in the
+//  ThinDockerClient interface with our own objects (instead of the ones that come
+// from docker)
+type ThinContainerExecRunner interface {
+	GetContainerConfig() (ContainerConfig, error)
+	GetHostConfig() (HostConfig, error)
+
+	ContainerAttach(ctx context.Context, container string, options ContainerAttachOptions) (HijackedResponse, error)
+	ContainerCreate(ctx context.Context, config ContainerConfig, hostConfig HostConfig, networkingConfig *NetworkingConfig, containerName string) (ContainerCreateResponse, error)
+	ContainerStart(ctx context.Context, container string, options ContainerStartOptions) error
+	ContainerRemove(ctx context.Context, container string, options ContainerRemoveOptions) error
+	ContainerWait(ctx context.Context, container string, condition WaitCondition) (<-chan ContainerWaitOKBody, <-chan error)
+
+	ContainerInspect(ctx context.Context, id string) (ContainerInspectResponse, error)
+	ImageInspectWithRaw(ctx context.Context, image string) (ImageInspectResponse, []byte, error)
+
+	ImageLoad(ctx context.Context, input io.Reader, quiet bool) (ImageLoadResponse, error)
+	ImageRemove(ctx context.Context, image string, options ImageRemoveOptions) ([]ImageDeleteResponseItem, error)
+}
diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go
index 969682f46..fb6aaee99 100644
--- a/lib/crunchrun/crunchrun.go
+++ b/lib/crunchrun/crunchrun.go
@@ -95,7 +95,12 @@ type PsProcess interface {
 // ContainerRunner is the main stateful struct used for a single execution of a
 // container.
 type ContainerRunner struct {
-	Docker ThinDockerClient
+	ContainerExecRunner ThinContainerExecRunner
+
+	//Docker          ThinDockerClient
+	ContainerConfig dockercontainer.Config     //FIXME: translate this to the ThinContainerRunner interface
+	HostConfig      dockercontainer.HostConfig //FIXME: translate this to the ThinContainerRunner interface
+	//--------------
 
 	// Dispatcher client is initialized with the Dispatcher token.
 	// This is a privileged token used to manage container status
@@ -119,35 +124,33 @@ type ContainerRunner struct {
 	ContainerArvClient  IArvadosClient
 	ContainerKeepClient IKeepClient
 
-	Container       arvados.Container
-	ContainerConfig dockercontainer.Config
-	HostConfig      dockercontainer.HostConfig
-	token           string
-	ContainerID     string
-	ExitCode        *int
-	NewLogWriter    NewLogWriter
-	loggingDone     chan bool
-	CrunchLog       *ThrottledLogger
-	Stdout          io.WriteCloser
-	Stderr          io.WriteCloser
-	logUUID         string
-	logMtx          sync.Mutex
-	LogCollection   arvados.CollectionFileSystem
-	LogsPDH         *string
-	RunArvMount     RunArvMount
-	MkTempDir       MkTempDir
-	ArvMount        *exec.Cmd
-	ArvMountPoint   string
-	HostOutputDir   string
-	Binds           []string
-	Volumes         map[string]struct{}
-	OutputPDH       *string
-	SigChan         chan os.Signal
-	ArvMountExit    chan error
-	SecretMounts    map[string]arvados.Mount
-	MkArvClient     func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error)
-	finalState      string
-	parentTemp      string
+	Container     arvados.Container
+	token         string
+	ContainerID   string
+	ExitCode      *int
+	NewLogWriter  NewLogWriter
+	loggingDone   chan bool
+	CrunchLog     *ThrottledLogger
+	Stdout        io.WriteCloser
+	Stderr        io.WriteCloser
+	logUUID       string
+	logMtx        sync.Mutex
+	LogCollection arvados.CollectionFileSystem
+	LogsPDH       *string
+	RunArvMount   RunArvMount
+	MkTempDir     MkTempDir
+	ArvMount      *exec.Cmd
+	ArvMountPoint string
+	HostOutputDir string
+	Binds         []string
+	Volumes       map[string]struct{}
+	OutputPDH     *string
+	SigChan       chan os.Signal
+	ArvMountExit  chan error
+	SecretMounts  map[string]arvados.Mount
+	MkArvClient   func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error)
+	finalState    string
+	parentTemp    string
 
 	statLogger       io.WriteCloser
 	statReporter     *crunchstat.Reporter
@@ -209,7 +212,7 @@ func (runner *ContainerRunner) stop(sig os.Signal) {
 	}
 	runner.cCancelled = true
 	runner.CrunchLog.Printf("removing container")
-	err := runner.Docker.ContainerRemove(context.TODO(), runner.ContainerID, dockertypes.ContainerRemoveOptions{Force: true})
+	err := runner.ContainerExecRunner.ContainerRemove(context.TODO(), runner.ContainerID, ContainerRemoveOptions{Force: true})
 	if err != nil {
 		runner.CrunchLog.Printf("error removing container: %s", err)
 	}
@@ -283,7 +286,7 @@ func (runner *ContainerRunner) LoadImage() (err error) {
 
 	runner.CrunchLog.Printf("Using Docker image id '%s'", imageID)
 
-	_, _, err = runner.Docker.ImageInspectWithRaw(context.TODO(), imageID)
+	_, _, err = runner.ContainerExecRunner.ImageInspectWithRaw(context.TODO(), imageID)
 	if err != nil {
 		runner.CrunchLog.Print("Loading Docker image from keep")
 
@@ -293,7 +296,7 @@ func (runner *ContainerRunner) LoadImage() (err error) {
 			return fmt.Errorf("While creating ManifestFileReader for container image: %v", err)
 		}
 
-		response, err := runner.Docker.ImageLoad(context.TODO(), readCloser, true)
+		response, err := runner.ContainerExecRunner.ImageLoad(context.TODO(), readCloser, true)
 		if err != nil {
 			return fmt.Errorf("While loading container image into Docker: %v", err)
 		}
@@ -976,8 +979,8 @@ func (runner *ContainerRunner) AttachStreams() (err error) {
 	}
 
 	stdinUsed := stdinRdr != nil || len(stdinJSON) != 0
-	response, err := runner.Docker.ContainerAttach(context.TODO(), runner.ContainerID,
-		dockertypes.ContainerAttachOptions{Stream: true, Stdin: stdinUsed, Stdout: true, Stderr: true})
+	response, err := runner.ContainerExecRunner.ContainerAttach(context.TODO(), runner.ContainerID,
+		ContainerAttachOptions{Stream: true, Stdin: stdinUsed, Stdout: true, Stderr: true})
 	if err != nil {
 		return fmt.Errorf("While attaching container stdout/stderr streams: %v", err)
 	}
@@ -1063,16 +1066,19 @@ func (runner *ContainerRunner) getStdoutFile(mntPath string) (*os.File, error) {
 func (runner *ContainerRunner) CreateContainer() error {
 	runner.CrunchLog.Print("Creating Docker container")
 
-	runner.ContainerConfig.Cmd = runner.Container.Command
+	containerConfig, err := runner.ContainerExecRunner.GetContainerConfig()
+	hostConfig, err := runner.ContainerExecRunner.GetHostConfig()
+
+	containerConfig.Cmd = runner.Container.Command
 	if runner.Container.Cwd != "." {
-		runner.ContainerConfig.WorkingDir = runner.Container.Cwd
+		containerConfig.WorkingDir = runner.Container.Cwd
 	}
 
 	for k, v := range runner.Container.Environment {
-		runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v)
+		containerConfig.Env = append(containerConfig.Env, k+"="+v)
 	}
 
-	runner.ContainerConfig.Volumes = runner.Volumes
+	containerConfig.Volumes = runner.Volumes
 
 	maxRAM := int64(runner.Container.RuntimeConstraints.RAM)
 	minDockerRAM := int64(16)
@@ -1099,7 +1105,7 @@ func (runner *ContainerRunner) CreateContainer() error {
 		if err != nil {
 			return err
 		}
-		runner.ContainerConfig.Env = append(runner.ContainerConfig.Env,
+		containerConfig.Env = append(containerConfig.Env,
 			"ARVADOS_API_TOKEN="+tok,
 			"ARVADOS_API_HOST="+os.Getenv("ARVADOS_API_HOST"),
 			"ARVADOS_API_HOST_INSECURE="+os.Getenv("ARVADOS_API_HOST_INSECURE"),
@@ -1114,13 +1120,13 @@ func (runner *ContainerRunner) CreateContainer() error {
 	}
 
 	_, stdinUsed := runner.Container.Mounts["stdin"]
-	runner.ContainerConfig.OpenStdin = stdinUsed
-	runner.ContainerConfig.StdinOnce = stdinUsed
-	runner.ContainerConfig.AttachStdin = stdinUsed
-	runner.ContainerConfig.AttachStdout = true
-	runner.ContainerConfig.AttachStderr = true
+	containerConfig.OpenStdin = stdinUsed
+	containerConfig.StdinOnce = stdinUsed
+	containerConfig.AttachStdin = stdinUsed
+	containerConfig.AttachStdout = true
+	containerConfig.AttachStderr = true
 
-	createdBody, err := runner.Docker.ContainerCreate(context.TODO(), &runner.ContainerConfig, &runner.HostConfig, nil, runner.Container.UUID)
+	createdBody, err := runner.ContainerExecRunner.ContainerCreate(context.TODO(), containerConfig, hostConfig, nil, runner.Container.UUID)
 	if err != nil {
 		return fmt.Errorf("While creating container: %v", err)
 	}
@@ -1138,8 +1144,8 @@ func (runner *ContainerRunner) StartContainer() error {
 	if runner.cCancelled {
 		return ErrCancelled
 	}
-	err := runner.Docker.ContainerStart(context.TODO(), runner.ContainerID,
-		dockertypes.ContainerStartOptions{})
+	err := runner.ContainerExecRunner.ContainerStart(context.TODO(), runner.ContainerID,
+		ContainerStartOptions{})
 	if err != nil {
 		var advice string
 		if m, e := regexp.MatchString("(?ms).*(exec|System error).*(no such file or directory|file not found).*", err.Error()); m && e == nil {
@@ -1156,7 +1162,7 @@ func (runner *ContainerRunner) WaitFinish() error {
 	var runTimeExceeded <-chan time.Time
 	runner.CrunchLog.Print("Waiting for container to finish")
 
-	waitOk, waitErr := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID, dockercontainer.WaitConditionNotRunning)
+	waitOk, waitErr := runner.ContainerExecRunner.ContainerWait(context.TODO(), runner.ContainerID, WaitConditionNotRunning)
 	arvMountExit := runner.ArvMountExit
 	if timeout := runner.Container.SchedulingParameters.MaxRunTime; timeout > 0 {
 		runTimeExceeded = time.After(time.Duration(timeout) * time.Second)
@@ -1170,7 +1176,7 @@ func (runner *ContainerRunner) WaitFinish() error {
 		}
 		for range time.NewTicker(runner.containerWatchdogInterval).C {
 			ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(runner.containerWatchdogInterval))
-			ctr, err := runner.Docker.ContainerInspect(ctx, runner.ContainerID)
+			ctr, err := runner.ContainerExecRunner.ContainerInspect(ctx, runner.ContainerID)
 			cancel()
 			runner.cStateLock.Lock()
 			done := runner.cRemoved || runner.ExitCode != nil
@@ -1727,14 +1733,14 @@ func (runner *ContainerRunner) fetchContainerRecord() error {
 func NewContainerRunner(dispatcherClient *arvados.Client,
 	dispatcherArvClient IArvadosClient,
 	dispatcherKeepClient IKeepClient,
-	docker ThinDockerClient,
+	containerRunner ThinContainerExecRunner,
 	containerUUID string) (*ContainerRunner, error) {
 
 	cr := &ContainerRunner{
 		dispatcherClient:     dispatcherClient,
 		DispatcherArvClient:  dispatcherArvClient,
 		DispatcherKeepClient: dispatcherKeepClient,
-		Docker:               docker,
+		ContainerExecRunner:  containerRunner,
 	}
 	cr.NewLogWriter = cr.NewArvLogWriter
 	cr.RunArvMount = cr.ArvMountCmd
@@ -1794,7 +1800,9 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
     	`)
 	memprofile := flags.String("memprofile", "", "write memory profile to `file` after running container")
 	flags.Duration("check-containerd", 0, "Ignored. Exists for compatibility with older versions.")
-
+	containerRunner := flags.String("container-runner", "docker",
+		`Specify the container runner. available options: docker, singularity.
+	`)
 	ignoreDetachFlag := false
 	if len(args) > 0 && args[0] == "-no-detach" {
 		// This process was invoked by a parent process, which
@@ -1863,22 +1871,41 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
 	kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2}
 	kc.Retries = 4
 
-	// API version 1.21 corresponds to Docker 1.9, which is currently the
-	// minimum version we want to support.
-	docker, dockererr := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
+	var cr *ContainerRunner
+	if *containerRunner == "docker" {
+		// API version 1.21 corresponds to Docker 1.9, which is currently the
+		// minimum version we want to support.
+		docker, dockererr := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
 
-	cr, err := NewContainerRunner(arvados.NewClientFromEnv(), api, kc, docker, containerID)
-	if err != nil {
-		log.Print(err)
-		return 1
-	}
-	if dockererr != nil {
-		cr.CrunchLog.Printf("%s: %v", containerID, dockererr)
-		cr.checkBrokenNode(dockererr)
-		cr.CrunchLog.Close()
-		return 1
-	}
+		cr, err = NewContainerRunner(arvados.NewClientFromEnv(), api, kc, adapter(docker), containerID)
+		if err != nil {
+			log.Print(err)
+			return 1
+		}
+		if dockererr != nil {
+			cr.CrunchLog.Printf("%s: %v", containerID, dockererr)
+			cr.checkBrokenNode(dockererr)
+			cr.CrunchLog.Close()
+			return 1
+		}
+	} else {
+		// Singularity
+
+		singularity, singularityerr := NewSingularityClient()
+
+		cr, err = NewContainerRunner(arvados.NewClientFromEnv(), api, kc, singularity, containerID)
+		if err != nil {
+			log.Print(err)
+			return 1
+		}
 
+		if singularityerr != nil {
+			cr.CrunchLog.Printf("%s: %v", containerID, singularityerr)
+			//cr.checkBrokenNode(singularityrerr) //
+			cr.CrunchLog.Close()
+			return 1
+		}
+	}
 	cr.gateway = Gateway{
 		Address:           os.Getenv("GatewayAddress"),
 		AuthSecret:        os.Getenv("GatewayAuthSecret"),
diff --git a/lib/crunchrun/crunchrun_test.go b/lib/crunchrun/crunchrun_test.go
index dbdaa6293..ddf40543e 100644
--- a/lib/crunchrun/crunchrun_test.go
+++ b/lib/crunchrun/crunchrun_test.go
@@ -439,7 +439,7 @@ func (client *KeepTestClient) ManifestFileReader(m manifest.Manifest, filename s
 
 func (s *TestSuite) TestLoadImage(c *C) {
 	cr, err := NewContainerRunner(s.client, &ArvTestClient{},
-		&KeepTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+		&KeepTestClient{}, adapter(s.docker), "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
 	c.Assert(err, IsNil)
 
 	kc := &KeepTestClient{}
@@ -447,10 +447,10 @@ func (s *TestSuite) TestLoadImage(c *C) {
 	cr.ContainerArvClient = &ArvTestClient{}
 	cr.ContainerKeepClient = kc
 
-	_, err = cr.Docker.ImageRemove(nil, hwImageID, dockertypes.ImageRemoveOptions{})
+	_, err = cr.ContainerExecRunner.ImageRemove(nil, hwImageID, ImageRemoveOptions{})
 	c.Check(err, IsNil)
 
-	_, _, err = cr.Docker.ImageInspectWithRaw(nil, hwImageID)
+	_, _, err = cr.ContainerExecRunner.ImageInspectWithRaw(nil, hwImageID)
 	c.Check(err, NotNil)
 
 	cr.Container.ContainerImage = hwPDH
@@ -463,13 +463,13 @@ func (s *TestSuite) TestLoadImage(c *C) {
 
 	c.Check(err, IsNil)
 	defer func() {
-		cr.Docker.ImageRemove(nil, hwImageID, dockertypes.ImageRemoveOptions{})
+		cr.ContainerExecRunner.ImageRemove(nil, hwImageID, ImageRemoveOptions{})
 	}()
 
 	c.Check(kc.Called, Equals, true)
 	c.Check(cr.ContainerConfig.Image, Equals, hwImageID)
 
-	_, _, err = cr.Docker.ImageInspectWithRaw(nil, hwImageID)
+	_, _, err = cr.ContainerExecRunner.ImageInspectWithRaw(nil, hwImageID)
 	c.Check(err, IsNil)
 
 	// (2) Test using image that's already loaded
@@ -574,7 +574,7 @@ func (s *TestSuite) TestLoadImageArvError(c *C) {
 func (s *TestSuite) TestLoadImageKeepError(c *C) {
 	// (2) Keep error
 	kc := &KeepErrorTestClient{}
-	cr, err := NewContainerRunner(s.client, &ArvTestClient{}, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+	cr, err := NewContainerRunner(s.client, &ArvTestClient{}, kc, adapter(s.docker), "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
 	c.Assert(err, IsNil)
 
 	cr.ContainerArvClient = &ArvTestClient{}
@@ -604,7 +604,7 @@ func (s *TestSuite) TestLoadImageCollectionError(c *C) {
 func (s *TestSuite) TestLoadImageKeepReadError(c *C) {
 	// (4) Collection doesn't contain image
 	kc := &KeepReadErrorTestClient{}
-	cr, err := NewContainerRunner(s.client, &ArvTestClient{}, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+	cr, err := NewContainerRunner(s.client, &ArvTestClient{}, kc, adapter(s.docker), "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
 	c.Assert(err, IsNil)
 	cr.Container.ContainerImage = hwPDH
 	cr.ContainerArvClient = &ArvTestClient{}
@@ -653,7 +653,7 @@ func (s *TestSuite) TestRunContainer(c *C) {
 	}
 	kc := &KeepTestClient{}
 	defer kc.Close()
-	cr, err := NewContainerRunner(s.client, &ArvTestClient{}, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+	cr, err := NewContainerRunner(s.client, &ArvTestClient{}, kc, adapter(s.docker), "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
 	c.Assert(err, IsNil)
 
 	cr.ContainerArvClient = &ArvTestClient{}
@@ -777,7 +777,7 @@ func (s *TestSuite) fullRunHelper(c *C, record string, extraMounts []string, exi
 	s.docker.api = api
 	kc := &KeepTestClient{}
 	defer kc.Close()
-	cr, err = NewContainerRunner(s.client, api, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+	cr, err = NewContainerRunner(s.client, api, kc, adapter(s.docker), "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
 	c.Assert(err, IsNil)
 	s.runner = cr
 	cr.statInterval = 100 * time.Millisecond
@@ -1136,7 +1136,7 @@ func (s *TestSuite) testStopContainer(c *C, setup func(cr *ContainerRunner)) {
 	api := &ArvTestClient{Container: rec}
 	kc := &KeepTestClient{}
 	defer kc.Close()
-	cr, err := NewContainerRunner(s.client, api, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+	cr, err := NewContainerRunner(s.client, api, kc, adapter(s.docker), "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
 	c.Assert(err, IsNil)
 	cr.RunArvMount = func([]string, string) (*exec.Cmd, error) { return nil, nil }
 	cr.MkArvClient = func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) {
@@ -1621,7 +1621,7 @@ func (s *TestSuite) stdoutErrorRunHelper(c *C, record string, fn func(t *TestDoc
 	api = &ArvTestClient{Container: rec}
 	kc := &KeepTestClient{}
 	defer kc.Close()
-	cr, err = NewContainerRunner(s.client, api, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+	cr, err = NewContainerRunner(s.client, api, kc, adapter(s.docker), "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
 	c.Assert(err, IsNil)
 	am := &ArvMountCmdLine{}
 	cr.RunArvMount = am.ArvMountTest
diff --git a/lib/crunchrun/docker_adapter.go b/lib/crunchrun/docker_adapter.go
new file mode 100644
index 000000000..795d06b56
--- /dev/null
+++ b/lib/crunchrun/docker_adapter.go
@@ -0,0 +1,219 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package crunchrun
+
+import (
+	"io"
+
+	dockertypes "github.com/docker/docker/api/types"
+	dockercontainer "github.com/docker/docker/api/types/container"
+	dockernetwork "github.com/docker/docker/api/types/network"
+	"golang.org/x/net/context"
+)
+
+type DockerAdapter struct {
+	docker          ThinDockerClient
+	containerConfig ContainerConfig
+	hostConfig      HostConfig
+}
+
+func (a *DockerAdapter) ContainerAttach(ctx context.Context, container string, options ContainerAttachOptions) (HijackedResponse, error) {
+	dockerOptions := dockertypes.ContainerAttachOptions{
+		Stream: options.Stream,
+		Stdin:  options.Stdin,
+		Stdout: options.Stdout,
+		Stderr: options.Stderr}
+	dockerResponse, docker_err := a.docker.ContainerAttach(ctx, container, dockerOptions)
+
+	adapterResponse := HijackedResponse{
+		Conn:   dockerResponse.Conn,
+		Reader: dockerResponse.Reader,
+	}
+
+	return adapterResponse, docker_err
+}
+
+func (a *DockerAdapter) ContainerCreate(ctx context.Context, config ContainerConfig, hostConfig HostConfig, networkingConfig *NetworkingConfig, containerName string) (ContainerCreateResponse, error) {
+	var dockerEndpointsConfig map[string]*dockernetwork.EndpointSettings
+
+	var dockerNetworkConfig *dockernetwork.NetworkingConfig
+	if networkingConfig != nil {
+		for k, v := range networkingConfig.EndpointsConfig {
+
+			dockerIpamConfig := &dockernetwork.EndpointIPAMConfig{
+				IPv4Address:  v.IPAMConfig.IPv4Address,
+				IPv6Address:  v.IPAMConfig.IPv6Address,
+				LinkLocalIPs: v.IPAMConfig.LinkLocalIPs,
+			}
+			dockerEndpointsConfig[k] = &dockernetwork.EndpointSettings{
+				IPAMConfig:          dockerIpamConfig,
+				Links:               v.Links,
+				Aliases:             v.Aliases,
+				NetworkID:           v.NetworkID,
+				EndpointID:          v.EndpointID,
+				Gateway:             v.Gateway,
+				IPAddress:           v.IPAddress,
+				IPPrefixLen:         v.IPPrefixLen,
+				IPv6Gateway:         v.IPv6Gateway,
+				GlobalIPv6Address:   v.GlobalIPv6Address,
+				GlobalIPv6PrefixLen: v.GlobalIPv6PrefixLen,
+				MacAddress:          v.MacAddress,
+				DriverOpts:          v.DriverOpts,
+			}
+
+		}
+
+		dockerNetworkConfig = &dockernetwork.NetworkingConfig{
+			EndpointsConfig: dockerEndpointsConfig,
+		}
+	}
+	dockerConfig := dockercontainer.Config{
+		OpenStdin:    config.OpenStdin,
+		StdinOnce:    config.StdinOnce,
+		AttachStdin:  config.AttachStdin,
+		AttachStdout: config.AttachStdout,
+		AttachStderr: config.AttachStderr,
+		Cmd:          config.Cmd,
+		WorkingDir:   config.WorkingDir,
+		Env:          config.Env,
+		Volumes:      config.Volumes,
+	}
+	dockerHostConfig := dockercontainer.HostConfig{}
+
+	dockerResponse, dockerErr := a.docker.ContainerCreate(ctx,
+		&dockerConfig,
+		&dockerHostConfig, dockerNetworkConfig, containerName)
+	adapterResponse := ContainerCreateResponse{
+		ID:       dockerResponse.ID,
+		Warnings: dockerResponse.Warnings,
+	}
+	return adapterResponse, dockerErr
+}
+
+func (a *DockerAdapter) ContainerStart(ctx context.Context, container string, options ContainerStartOptions) error {
+	dockerContainerStartOptions := dockertypes.ContainerStartOptions{
+		CheckpointID:  options.CheckpointID,
+		CheckpointDir: options.CheckpointDir,
+	}
+
+	dockerErr := a.docker.ContainerStart(ctx, container, dockerContainerStartOptions)
+
+	return dockerErr
+}
+
+func (a *DockerAdapter) ContainerRemove(ctx context.Context, container string, options ContainerRemoveOptions) error {
+	dockerContainerRemoveOptions := dockertypes.ContainerRemoveOptions{Force: options.Force}
+
+	dockerErr := a.docker.ContainerRemove(ctx, container, dockerContainerRemoveOptions)
+
+	return dockerErr
+}
+
+func (a *DockerAdapter) ContainerInspect(ctx context.Context, id string) (ContainerInspectResponse, error) {
+
+	dockerContainerInspectResponse, dockerErr := a.docker.ContainerInspect(ctx, id)
+
+	containerState := &ContainerState{
+		Running:    dockerContainerInspectResponse.State.Running,
+		Paused:     dockerContainerInspectResponse.State.Paused,
+		Restarting: dockerContainerInspectResponse.State.Restarting,
+		OOMKilled:  dockerContainerInspectResponse.State.OOMKilled,
+		Dead:       dockerContainerInspectResponse.State.Dead,
+		Pid:        dockerContainerInspectResponse.State.Pid,
+		ExitCode:   dockerContainerInspectResponse.State.ExitCode,
+		Error:      dockerContainerInspectResponse.State.Error,
+		StartedAt:  dockerContainerInspectResponse.State.StartedAt,
+		FinishedAt: dockerContainerInspectResponse.State.FinishedAt,
+	}
+
+	adapterResponse := &ContainerInspectResponse{State: containerState}
+
+	return *adapterResponse, dockerErr
+}
+
+func (a *DockerAdapter) ContainerWait(ctx context.Context, container string, condition WaitCondition) (<-chan ContainerWaitOKBody, <-chan error) {
+
+	//var dockercontainerCondition dockercontainer.WaitCondition =
+	dockercontainerCondition := dockercontainer.WaitCondition(condition)
+
+	dockerContainerWaitOKBody, dockerErr := a.docker.ContainerWait(ctx, container, dockercontainerCondition)
+
+	// translate from <-chan dockercontainer.ContainerWaitOKBody to <-chan ContainerWaitOKBody,
+	adapterContainerWaitOKBody := make(chan ContainerWaitOKBody)
+	go func() {
+		for dockerMsg := range dockerContainerWaitOKBody {
+			var adapterBodyMsg *ContainerWaitOKBody
+			var adapterError *ContainerWaitOKBodyError
+
+			if dockerMsg.Error != nil {
+				adapterError = &ContainerWaitOKBodyError{
+					Message: dockerMsg.Error.Message,
+				}
+			}
+
+			adapterBodyMsg = &ContainerWaitOKBody{
+				Error:      adapterError,
+				StatusCode: dockerMsg.StatusCode,
+			}
+
+			adapterContainerWaitOKBody <- *adapterBodyMsg
+		}
+	}()
+
+	return adapterContainerWaitOKBody, dockerErr
+}
+
+func (a *DockerAdapter) ImageInspectWithRaw(ctx context.Context, image string) (ImageInspectResponse, []byte, error) {
+	dockerImageInspectResponse, rawBytes, dockerErr := a.docker.ImageInspectWithRaw(ctx, image)
+
+	adapterImageInspectResponse := &ImageInspectResponse{
+		ID: dockerImageInspectResponse.ID,
+	}
+	return *adapterImageInspectResponse, rawBytes, dockerErr
+}
+
+func (a *DockerAdapter) ImageLoad(ctx context.Context, input io.Reader, quiet bool) (ImageLoadResponse, error) {
+	dockerImageLoadResponse, dockerErr := a.docker.ImageLoad(ctx, input, quiet)
+
+	adapterImageLoadResponse := &ImageLoadResponse{
+		Body: dockerImageLoadResponse.Body,
+		JSON: dockerImageLoadResponse.JSON,
+	}
+
+	return *adapterImageLoadResponse, dockerErr
+}
+
+func (a *DockerAdapter) ImageRemove(ctx context.Context, image string, options ImageRemoveOptions) ([]ImageDeleteResponseItem, error) {
+
+	dockerOptions := &dockertypes.ImageRemoveOptions{
+		Force:         options.Force,
+		PruneChildren: options.PruneChildren,
+	}
+	dockerImageDeleteResponseItems, dockerErr := a.docker.ImageRemove(ctx, image, *dockerOptions)
+
+	var adapterResponses []ImageDeleteResponseItem
+	for _, dockerResponse := range dockerImageDeleteResponseItems {
+		adapterResponse := &ImageDeleteResponseItem{
+			Deleted:  dockerResponse.Deleted,
+			Untagged: dockerResponse.Untagged,
+		}
+		adapterResponses = append(adapterResponses, *adapterResponse)
+	}
+	return adapterResponses, dockerErr
+}
+
+func (a *DockerAdapter) GetContainerConfig() (ContainerConfig, error) {
+	return a.containerConfig, nil
+}
+
+func (a *DockerAdapter) GetHostConfig() (HostConfig, error) {
+	return a.hostConfig, nil
+}
+
+func adapter(docker ThinDockerClient) ThinContainerExecRunner {
+	return_object := &DockerAdapter{docker: docker}
+
+	return return_object
+}
diff --git a/lib/crunchrun/singularity.go b/lib/crunchrun/singularity.go
new file mode 100644
index 000000000..91ec0ea11
--- /dev/null
+++ b/lib/crunchrun/singularity.go
@@ -0,0 +1,86 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package crunchrun
+
+import (
+	"fmt"
+	"io"
+
+	"golang.org/x/net/context"
+)
+
+type SingularityClient struct {
+	containerConfig ContainerConfig
+	hostConfig      HostConfig
+}
+
+func (c SingularityClient) GetContainerConfig() (ContainerConfig, error) {
+	return c.containerConfig, nil
+}
+
+func (c SingularityClient) GetHostConfig() (HostConfig, error) {
+	return c.hostConfig, nil
+}
+
+func (c SingularityClient) ContainerAttach(ctx context.Context, container string, options ContainerAttachOptions) (HijackedResponse, error) {
+	fmt.Printf("placeholder for container ContainerAttach %s", container)
+
+	return HijackedResponse{}, nil
+}
+
+func (c SingularityClient) ContainerCreate(ctx context.Context, config ContainerConfig, hostConfig HostConfig, networkingConfig *NetworkingConfig, containerName string) (ContainerCreateResponse, error) {
+	fmt.Printf("placeholder for container ContainerCreate %s", containerName)
+
+	return ContainerCreateResponse{}, nil
+}
+
+func (c SingularityClient) ContainerStart(ctx context.Context, container string, options ContainerStartOptions) error {
+	fmt.Printf("placeholder for container ContainerStart %s", container)
+
+	return nil
+}
+
+func (c SingularityClient) ContainerRemove(ctx context.Context, container string, options ContainerRemoveOptions) error {
+	fmt.Printf("placeholder for container ContainerRemove %s", container)
+
+	return nil
+}
+
+func (c SingularityClient) ContainerWait(ctx context.Context, container string, condition WaitCondition) (<-chan ContainerWaitOKBody, <-chan error) {
+	fmt.Printf("placeholder for ContainerWait")
+	chanC := make(chan ContainerWaitOKBody)
+	chanE := make(chan error)
+	return chanC, chanE
+}
+
+func (c SingularityClient) ContainerInspect(ctx context.Context, id string) (ContainerInspectResponse, error) {
+	fmt.Printf("placeholder for container ContainerInspect %s", id)
+
+	return ContainerInspectResponse{}, nil
+}
+
+func (c SingularityClient) ImageInspectWithRaw(ctx context.Context, image string) (ImageInspectResponse, []byte, error) {
+	fmt.Printf("placeholder for ImageInspectWithRaw() %s", image)
+
+	return ImageInspectResponse{}, []byte(""), nil
+}
+
+func (c SingularityClient) ImageLoad(ctx context.Context, input io.Reader, quiet bool) (ImageLoadResponse, error) {
+	fmt.Printf("placeholder for ImageLoad")
+	return ImageLoadResponse{}, nil
+}
+
+func (c SingularityClient) ImageRemove(ctx context.Context, image string, options ImageRemoveOptions) ([]ImageDeleteResponseItem, error) {
+	fmt.Printf("placeholder for ImageRemove")
+	var responses []ImageDeleteResponseItem
+	tmp := ImageDeleteResponseItem{}
+	responses = append(responses, tmp)
+	return responses, nil
+}
+
+func NewSingularityClient() (SingularityClient, error) {
+	var s = &SingularityClient{}
+	return *s, nil
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list