[ARVADOS] updated: 1.1.2-120-g8758f3d

Git user git at public.curoverse.com
Mon Jan 22 12:34:53 EST 2018


Summary of changes:
 services/crunch-dispatch-slurm/crunch-dispatch-slurm.go |  1 -
 services/crunch-dispatch-slurm/slurm.go                 | 10 +++++-----
 services/crunch-run/crunchrun.go                        |  9 ++++-----
 services/crunch-run/crunchrun_test.go                   |  2 +-
 4 files changed, 10 insertions(+), 12 deletions(-)

  discards  405dea2598f2e45ecc9337aa462c385cd4e893f0 (commit)
  discards  81350383aee055b3bd0b6f25e90575b13ffa350f (commit)
  discards  2f512a373dbb77ec2b170387bfb4ae03c89f2281 (commit)
  discards  c5455d7940c7838551d64f0272eb6f6b1ec04dd9 (commit)
  discards  6846bec056e246cd68a2881b7b0edffe0bea24b6 (commit)
       via  8758f3ddc81b9ef9aaab111eb331e452c4ec7de5 (commit)
       via  a63829d71b904ead17aeac3c68bf01e9b020a3a1 (commit)
       via  bb72c328306667a0429c10caa90a79b2b9ff0bd6 (commit)
       via  2edd084814bd450c0bfb14915bb9bc3ce96498eb (commit)
       via  3194c1b24ffe6fff5fcb2f620ca6ee43741e3462 (commit)
       via  d14421423f2bd72b2c3eb95bdebe85a210972a12 (commit)

This update added new revisions after undoing existing revisions.  That is
to say, the old revision is not a strict subset of the new revision.  This
situation occurs when you --force push a change and generate a repository
containing something like this:

 * -- * -- B -- O -- O -- O (405dea2598f2e45ecc9337aa462c385cd4e893f0)
            \
             N -- N -- N (8758f3ddc81b9ef9aaab111eb331e452c4ec7de5)

When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 8758f3ddc81b9ef9aaab111eb331e452c4ec7de5
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Mon Jan 22 10:57:18 2018 -0500

    12891: Use docker "rm -f" instead of "stop" to cancel container.
    
    "Stop" sends SIGTERM and then (after a timeout) SIGKILL to the main
    process in the container. If the contained process catches SIGTERM,
    the outcome depends on how much cleanup it manages to do in the
    timeout period. We prefer a more predictable SIGKILL outcome.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index 45e48c2..a249273 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -75,7 +75,7 @@ type ThinDockerClient interface {
 	ContainerCreate(ctx context.Context, config *dockercontainer.Config, hostConfig *dockercontainer.HostConfig,
 		networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error)
 	ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error
-	ContainerStop(ctx context.Context, container string, timeout *time.Duration) error
+	ContainerRemove(ctx context.Context, container string, options dockertypes.ContainerRemoveOptions) error
 	ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error)
 	ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error)
 	ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error)
@@ -165,11 +165,10 @@ func (runner *ContainerRunner) stop() {
 		return
 	}
 	runner.cCancelled = true
-	runner.CrunchLog.Printf("stopping container")
-	timeout := 10 * time.Second
-	err := runner.Docker.ContainerStop(context.TODO(), runner.ContainerID, &timeout)
+	runner.CrunchLog.Printf("removing container")
+	err := runner.Docker.ContainerRemove(context.TODO(), runner.ContainerID, dockertypes.ContainerRemoveOptions{Force: true})
 	if err != nil {
-		runner.CrunchLog.Printf("error stopping container: %s", err)
+		runner.CrunchLog.Printf("error removing container: %s", err)
 	}
 }
 
diff --git a/services/crunch-run/crunchrun_test.go b/services/crunch-run/crunchrun_test.go
index a524e48..a17dff3 100644
--- a/services/crunch-run/crunchrun_test.go
+++ b/services/crunch-run/crunchrun_test.go
@@ -152,7 +152,7 @@ func (t *TestDockerClient) ContainerStart(ctx context.Context, container string,
 	}
 }
 
-func (t *TestDockerClient) ContainerStop(ctx context.Context, container string, timeout *time.Duration) error {
+func (t *TestDockerClient) ContainerRemove(ctx context.Context, container string, options dockertypes.ContainerRemoveOptions) error {
 	t.stop <- true
 	return nil
 }

commit a63829d71b904ead17aeac3c68bf01e9b020a3a1
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Thu Jan 18 16:41:39 2018 -0500

    12891: Avoid flapping finalState after cancel.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index a78d47d..45e48c2 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -1139,10 +1139,6 @@ func (runner *ContainerRunner) UploadOutputFile(
 
 // HandleOutput sets the output, unmounts the FUSE mount, and deletes temporary directories
 func (runner *ContainerRunner) CaptureOutput() error {
-	if runner.finalState != "Complete" {
-		return nil
-	}
-
 	if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
 		// Output may have been set directly by the container, so
 		// refresh the container record to check.
@@ -1595,7 +1591,7 @@ func (runner *ContainerRunner) Run() (err error) {
 	}
 
 	err = runner.WaitFinish()
-	if err == nil {
+	if err == nil && !runner.IsCancelled() {
 		runner.finalState = "Complete"
 	}
 	return

commit bb72c328306667a0429c10caa90a79b2b9ff0bd6
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Thu Jan 18 16:41:14 2018 -0500

    12891: Fix use of magic string.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index b8c80dd..a78d47d 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -930,7 +930,7 @@ func (runner *ContainerRunner) StartContainer() error {
 func (runner *ContainerRunner) WaitFinish() (err error) {
 	runner.CrunchLog.Print("Waiting for container to finish")
 
-	waitOk, waitErr := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID, "not-running")
+	waitOk, waitErr := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID, dockercontainer.WaitConditionNotRunning)
 
 	go func() {
 		<-runner.ArvMountExit

commit 2edd084814bd450c0bfb14915bb9bc3ce96498eb
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Thu Jan 18 16:40:59 2018 -0500

    12891: Don't give up after trying once to stop the docker container.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index 1bd4956..b8c80dd 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -150,11 +150,10 @@ func (runner *ContainerRunner) setupSignals() {
 	signal.Notify(runner.SigChan, syscall.SIGQUIT)
 
 	go func(sig chan os.Signal) {
-		s := <-sig
-		if s != nil {
-			runner.CrunchLog.Printf("Caught signal %v", s)
+		for s := range sig {
+			runner.CrunchLog.Printf("caught signal: %v", s)
+			runner.stop()
 		}
-		runner.stop()
 	}(runner.SigChan)
 }
 
@@ -162,25 +161,21 @@ func (runner *ContainerRunner) setupSignals() {
 func (runner *ContainerRunner) stop() {
 	runner.cStateLock.Lock()
 	defer runner.cStateLock.Unlock()
-	if runner.cCancelled {
+	if !runner.cStarted {
 		return
 	}
 	runner.cCancelled = true
-	if runner.cStarted {
-		timeout := time.Duration(10)
-		err := runner.Docker.ContainerStop(context.TODO(), runner.ContainerID, &(timeout))
-		if err != nil {
-			runner.CrunchLog.Printf("StopContainer failed: %s", err)
-		}
-		// Suppress multiple calls to stop()
-		runner.cStarted = false
+	runner.CrunchLog.Printf("stopping container")
+	timeout := 10 * time.Second
+	err := runner.Docker.ContainerStop(context.TODO(), runner.ContainerID, &timeout)
+	if err != nil {
+		runner.CrunchLog.Printf("error stopping container: %s", err)
 	}
 }
 
 func (runner *ContainerRunner) stopSignals() {
 	if runner.SigChan != nil {
 		signal.Stop(runner.SigChan)
-		close(runner.SigChan)
 	}
 }
 

commit 3194c1b24ffe6fff5fcb2f620ca6ee43741e3462
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Thu Jan 18 16:39:16 2018 -0500

    12891: Don't use SIGKILL when telling crunch-run to cancel.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/services/crunch-dispatch-slurm/slurm.go b/services/crunch-dispatch-slurm/slurm.go
index f675f6c..bd19377 100644
--- a/services/crunch-dispatch-slurm/slurm.go
+++ b/services/crunch-dispatch-slurm/slurm.go
@@ -26,7 +26,28 @@ func (scli *slurmCLI) Batch(script io.Reader, args []string) error {
 }
 
 func (scli *slurmCLI) Cancel(name string) error {
-	return scli.run(nil, "scancel", []string{"--name=" + name})
+	for _, args := range [][]string{
+		// If the slurm job hasn't started yet, remove it from
+		// the queue.
+		{"--state=pending"},
+		// If the slurm job has started, send SIGTERM. If we
+		// cancel a running job without a --signal argument,
+		// slurm will send SIGTERM and then (after some
+		// site-configured interval) SIGKILL. This would kill
+		// crunch-run without stopping the container, which we
+		// don't want.
+		{"--batch", "--signal=TERM", "--state=running"},
+		{"--batch", "--signal=TERM", "--state=suspended"},
+	} {
+		err := scli.run(nil, "scancel", append([]string{"--name=" + name}, args...))
+		if err != nil {
+			// scancel exits 0 if no job matches the given
+			// name and state. Any error from scancel here
+			// really indicates something is wrong.
+			return err
+		}
+	}
+	return nil
 }
 
 func (scli *slurmCLI) QueueCommand(args []string) *exec.Cmd {

commit d14421423f2bd72b2c3eb95bdebe85a210972a12
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Thu Jan 18 16:28:39 2018 -0500

    12891: Refactor slurm commands.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index 3c89103..ae2ca58 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -7,14 +7,12 @@ package main
 // Dispatcher service for Crunch that submits containers to the slurm queue.
 
 import (
-	"bytes"
 	"context"
 	"flag"
 	"fmt"
 	"log"
 	"math"
 	"os"
-	"os/exec"
 	"regexp"
 	"strings"
 	"time"
@@ -43,9 +41,12 @@ type Config struct {
 
 	// Minimum time between two attempts to run the same container
 	MinRetryPeriod arvados.Duration
+
+	slurm Slurm
 }
 
 func main() {
+	theConfig.slurm = &slurmCLI{}
 	err := doMain()
 	if err != nil {
 		log.Fatal(err)
@@ -175,8 +176,7 @@ func niceness(priority int) int {
 	return (1000 - priority) * 10
 }
 
-// sbatchCmd
-func sbatchFunc(container arvados.Container) *exec.Cmd {
+func sbatchArgs(container arvados.Container) []string {
 	mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+container.RuntimeConstraints.KeepCacheRAM) / float64(1048576)))
 
 	var disk int64
@@ -198,61 +198,22 @@ func sbatchFunc(container arvados.Container) *exec.Cmd {
 		sbatchArgs = append(sbatchArgs, fmt.Sprintf("--partition=%s", strings.Join(container.SchedulingParameters.Partitions, ",")))
 	}
 
-	return exec.Command("sbatch", sbatchArgs...)
-}
-
-// scancelCmd
-func scancelFunc(container arvados.Container) *exec.Cmd {
-	return exec.Command("scancel", "--name="+container.UUID)
-}
-
-// scontrolCmd
-func scontrolFunc(container arvados.Container) *exec.Cmd {
-	return exec.Command("scontrol", "update", "JobName="+container.UUID, fmt.Sprintf("Nice=%d", niceness(container.Priority)))
+	return sbatchArgs
 }
 
-// Wrap these so that they can be overridden by tests
-var sbatchCmd = sbatchFunc
-var scancelCmd = scancelFunc
-var scontrolCmd = scontrolFunc
-
-// Submit job to slurm using sbatch.
 func submit(dispatcher *dispatch.Dispatcher, container arvados.Container, crunchRunCommand []string) error {
-	cmd := sbatchCmd(container)
-
-	// Send a tiny script on stdin to execute the crunch-run
-	// command (slurm requires this to be a #! script)
-
 	// append() here avoids modifying crunchRunCommand's
 	// underlying array, which is shared with other goroutines.
-	args := append([]string(nil), crunchRunCommand...)
-	args = append(args, container.UUID)
-	cmd.Stdin = strings.NewReader(execScript(args))
-
-	var stdout, stderr bytes.Buffer
-	cmd.Stdout = &stdout
-	cmd.Stderr = &stderr
+	crArgs := append([]string(nil), crunchRunCommand...)
+	crArgs = append(crArgs, container.UUID)
+	crScript := strings.NewReader(execScript(crArgs))
 
-	// Mutex between squeue sync and running sbatch or scancel.
 	sqCheck.L.Lock()
 	defer sqCheck.L.Unlock()
 
-	log.Printf("exec sbatch %+q", cmd.Args)
-	err := cmd.Run()
-
-	switch err.(type) {
-	case nil:
-		log.Printf("sbatch succeeded: %q", strings.TrimSpace(stdout.String()))
-		return nil
-
-	case *exec.ExitError:
-		dispatcher.Unlock(container.UUID)
-		return fmt.Errorf("sbatch %+q failed: %v (stderr: %q)", cmd.Args, err, stderr.Bytes())
-
-	default:
-		dispatcher.Unlock(container.UUID)
-		return fmt.Errorf("exec failed: %v", err)
-	}
+	sbArgs := sbatchArgs(container)
+	log.Printf("running sbatch %+q", sbArgs)
+	return theConfig.slurm.Batch(crScript, sbArgs)
 }
 
 // Submit a container to the slurm queue (or resume monitoring if it's
@@ -313,10 +274,8 @@ func run(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados
 			} else if updated.Priority == 0 {
 				log.Printf("Container %s has state %q, priority %d: cancel slurm job", ctr.UUID, updated.State, updated.Priority)
 				scancel(ctr)
-			} else if niceness(updated.Priority) != sqCheck.GetNiceness(ctr.UUID) && sqCheck.GetNiceness(ctr.UUID) != -1 {
-				// dynamically adjust priority
-				log.Printf("Container priority %v != %v", niceness(updated.Priority), sqCheck.GetNiceness(ctr.UUID))
-				scontrolUpdate(updated)
+			} else {
+				renice(updated)
 			}
 		}
 	}
@@ -324,12 +283,11 @@ func run(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados
 
 func scancel(ctr arvados.Container) {
 	sqCheck.L.Lock()
-	cmd := scancelCmd(ctr)
-	msg, err := cmd.CombinedOutput()
+	err := theConfig.slurm.Cancel(ctr.UUID)
 	sqCheck.L.Unlock()
 
 	if err != nil {
-		log.Printf("%q %q: %s %q", cmd.Path, cmd.Args, err, msg)
+		log.Printf("scancel: %s", err)
 		time.Sleep(time.Second)
 	} else if sqCheck.HasUUID(ctr.UUID) {
 		log.Printf("container %s is still in squeue after scancel", ctr.UUID)
@@ -337,17 +295,24 @@ func scancel(ctr arvados.Container) {
 	}
 }
 
-func scontrolUpdate(ctr arvados.Container) {
+func renice(ctr arvados.Container) {
+	nice := niceness(ctr.Priority)
+	oldnice := sqCheck.GetNiceness(ctr.UUID)
+	if nice == oldnice || oldnice == -1 {
+		return
+	}
+	log.Printf("updating slurm nice value to %d (was %d)", nice, oldnice)
 	sqCheck.L.Lock()
-	cmd := scontrolCmd(ctr)
-	msg, err := cmd.CombinedOutput()
+	err := theConfig.slurm.Renice(ctr.UUID, nice)
 	sqCheck.L.Unlock()
 
 	if err != nil {
-		log.Printf("%q %q: %s %q", cmd.Path, cmd.Args, err, msg)
+		log.Printf("renice: %s", err)
 		time.Sleep(time.Second)
-	} else if sqCheck.HasUUID(ctr.UUID) {
-		log.Printf("Container %s priority is now %v, niceness is now %v",
+		return
+	}
+	if sqCheck.HasUUID(ctr.UUID) {
+		log.Printf("container %s has arvados priority %d, slurm nice %d",
 			ctr.UUID, ctr.Priority, sqCheck.GetNiceness(ctr.UUID))
 	}
 }
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
index a823755..830976d 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
@@ -7,6 +7,7 @@ package main
 import (
 	"bytes"
 	"context"
+	"errors"
 	"fmt"
 	"io"
 	"io/ioutil"
@@ -64,51 +65,55 @@ func (s *MockArvadosServerSuite) TearDownTest(c *C) {
 	arvadostest.ResetEnv()
 }
 
-func (s *TestSuite) integrationTest(c *C,
-	newSqueueCmd func() *exec.Cmd,
-	newScancelCmd func(arvados.Container) *exec.Cmd,
-	newSbatchCmd func(arvados.Container) *exec.Cmd,
-	newScontrolCmd func(arvados.Container) *exec.Cmd,
-	sbatchCmdComps []string,
-	runContainer func(*dispatch.Dispatcher, arvados.Container)) arvados.Container {
-	arvadostest.ResetEnv()
+type slurmFake struct {
+	didBatch  [][]string
+	didCancel []string
+	didRenice [][]string
+	queue     string
+	// If non-nil, run this func during the 2nd+ call to Cancel()
+	onCancel func()
+	// Error returned by Batch()
+	errBatch error
+}
 
-	arv, err := arvadosclient.MakeArvadosClient()
-	c.Assert(err, IsNil)
+func (sf *slurmFake) Batch(script io.Reader, args []string) error {
+	sf.didBatch = append(sf.didBatch, args)
+	return sf.errBatch
+}
 
-	var sbatchCmdLine []string
+func (sf *slurmFake) QueueCommand(args []string) *exec.Cmd {
+	return exec.Command("echo", sf.queue)
+}
 
-	// Override sbatchCmd
-	defer func(orig func(arvados.Container) *exec.Cmd) {
-		sbatchCmd = orig
-	}(sbatchCmd)
+func (sf *slurmFake) Renice(name string, nice int) error {
+	sf.didRenice = append(sf.didRenice, []string{name, fmt.Sprintf("%d", nice)})
+	return nil
+}
 
-	if newSbatchCmd != nil {
-		sbatchCmd = newSbatchCmd
-	} else {
-		sbatchCmd = func(container arvados.Container) *exec.Cmd {
-			sbatchCmdLine = sbatchFunc(container).Args
-			return exec.Command("sh")
-		}
+func (sf *slurmFake) Cancel(name string) error {
+	sf.didCancel = append(sf.didCancel, name)
+	if len(sf.didCancel) == 1 {
+		// simulate error on first attempt
+		return errors.New("something terrible happened")
+	}
+	if sf.onCancel != nil {
+		sf.onCancel()
 	}
+	return nil
+}
 
-	// Override squeueCmd
-	defer func(orig func() *exec.Cmd) {
-		squeueCmd = orig
-	}(squeueCmd)
-	squeueCmd = newSqueueCmd
+func (s *TestSuite) integrationTest(c *C, slurm *slurmFake,
+	expectBatch [][]string,
+	runContainer func(*dispatch.Dispatcher, arvados.Container)) arvados.Container {
+	arvadostest.ResetEnv()
 
-	// Override scancel
-	defer func(orig func(arvados.Container) *exec.Cmd) {
-		scancelCmd = orig
-	}(scancelCmd)
-	scancelCmd = newScancelCmd
+	arv, err := arvadosclient.MakeArvadosClient()
+	c.Assert(err, IsNil)
 
-	// Override scontrol
-	defer func(orig func(arvados.Container) *exec.Cmd) {
-		scontrolCmd = orig
-	}(scontrolCmd)
-	scontrolCmd = newScontrolCmd
+	defer func(orig Slurm) {
+		theConfig.slurm = orig
+	}(theConfig.slurm)
+	theConfig.slurm = slurm
 
 	// There should be one queued container
 	params := arvadosclient.Dict{
@@ -130,6 +135,7 @@ func (s *TestSuite) integrationTest(c *C,
 		RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
 			go func() {
 				runContainer(disp, ctr)
+				slurm.queue = ""
 				doneRun <- struct{}{}
 			}()
 			run(disp, ctr, status)
@@ -145,7 +151,7 @@ func (s *TestSuite) integrationTest(c *C,
 
 	sqCheck.Stop()
 
-	c.Check(sbatchCmdLine, DeepEquals, sbatchCmdComps)
+	c.Check(slurm.didBatch, DeepEquals, expectBatch)
 
 	// There should be no queued containers now
 	err = arv.List("containers", params, &containers)
@@ -160,77 +166,47 @@ func (s *TestSuite) integrationTest(c *C,
 }
 
 func (s *TestSuite) TestIntegrationNormal(c *C) {
-	done := false
 	container := s.integrationTest(c,
-		func() *exec.Cmd {
-			if done {
-				return exec.Command("true")
-			} else {
-				return exec.Command("echo", "zzzzz-dz642-queuedcontainer 9990 100")
-			}
-		},
-		nil,
+		&slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"},
 		nil,
-		nil,
-		[]string(nil),
 		func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
 			dispatcher.UpdateState(container.UUID, dispatch.Running)
 			time.Sleep(3 * time.Second)
 			dispatcher.UpdateState(container.UUID, dispatch.Complete)
-			done = true
 		})
 	c.Check(container.State, Equals, arvados.ContainerStateComplete)
 }
 
 func (s *TestSuite) TestIntegrationCancel(c *C) {
-	var cmd *exec.Cmd
-	var scancelCmdLine []string
-	attempt := 0
-
+	slurm := &slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"}
+	readyToCancel := make(chan bool)
+	slurm.onCancel = func() { <-readyToCancel }
 	container := s.integrationTest(c,
-		func() *exec.Cmd {
-			if cmd != nil && cmd.ProcessState != nil {
-				return exec.Command("true")
-			} else {
-				return exec.Command("echo", "zzzzz-dz642-queuedcontainer 9990 100")
-			}
-		},
-		func(container arvados.Container) *exec.Cmd {
-			if attempt++; attempt == 1 {
-				return exec.Command("false")
-			} else {
-				scancelCmdLine = scancelFunc(container).Args
-				cmd = exec.Command("echo")
-				return cmd
-			}
-		},
-		nil,
+		slurm,
 		nil,
-		[]string(nil),
 		func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
 			dispatcher.UpdateState(container.UUID, dispatch.Running)
-			time.Sleep(1 * time.Second)
+			time.Sleep(time.Second)
 			dispatcher.Arv.Update("containers", container.UUID,
 				arvadosclient.Dict{
 					"container": arvadosclient.Dict{"priority": 0}},
 				nil)
+			readyToCancel <- true
+			close(readyToCancel)
 		})
 	c.Check(container.State, Equals, arvados.ContainerStateCancelled)
-	c.Check(scancelCmdLine, DeepEquals, []string{"scancel", "--name=zzzzz-dz642-queuedcontainer"})
+	c.Check(len(slurm.didCancel) > 1, Equals, true)
+	c.Check(slurm.didCancel[:2], DeepEquals, []string{"zzzzz-dz642-queuedcontainer", "zzzzz-dz642-queuedcontainer"})
 }
 
 func (s *TestSuite) TestIntegrationMissingFromSqueue(c *C) {
-	container := s.integrationTest(c,
-		func() *exec.Cmd { return exec.Command("echo") },
-		nil,
-		nil,
-		nil,
-		[]string{"sbatch",
+	container := s.integrationTest(c, &slurmFake{},
+		[][]string{{
 			fmt.Sprintf("--job-name=%s", "zzzzz-dz642-queuedcontainer"),
 			fmt.Sprintf("--mem=%d", 11445),
 			fmt.Sprintf("--cpus-per-task=%d", 4),
 			fmt.Sprintf("--tmp=%d", 45777),
-			fmt.Sprintf("--nice=%d", 9990)},
+			fmt.Sprintf("--nice=%d", 9990)}},
 		func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
 			dispatcher.UpdateState(container.UUID, dispatch.Running)
 			time.Sleep(3 * time.Second)
@@ -241,13 +217,8 @@ func (s *TestSuite) TestIntegrationMissingFromSqueue(c *C) {
 
 func (s *TestSuite) TestSbatchFail(c *C) {
 	container := s.integrationTest(c,
-		func() *exec.Cmd { return exec.Command("echo") },
-		nil,
-		func(container arvados.Container) *exec.Cmd {
-			return exec.Command("false")
-		},
-		nil,
-		[]string(nil),
+		&slurmFake{errBatch: errors.New("something terrible happened")},
+		[][]string{{"--job-name=zzzzz-dz642-queuedcontainer", "--mem=11445", "--cpus-per-task=4", "--tmp=45777", "--nice=9990"}},
 		func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
 			dispatcher.UpdateState(container.UUID, dispatch.Running)
 			dispatcher.UpdateState(container.UUID, dispatch.Complete)
@@ -387,71 +358,47 @@ func (s *MockArvadosServerSuite) TestSbatchFuncWithConfigArgs(c *C) {
 }
 
 func testSbatchFuncWithArgs(c *C, args []string) {
+	defer func() { theConfig.SbatchArguments = nil }()
 	theConfig.SbatchArguments = append(theConfig.SbatchArguments, args...)
 
 	container := arvados.Container{
 		UUID:               "123",
 		RuntimeConstraints: arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 2},
 		Priority:           1}
-	sbatchCmd := sbatchFunc(container)
 
 	var expected []string
-	expected = append(expected, "sbatch")
 	expected = append(expected, theConfig.SbatchArguments...)
 	expected = append(expected, "--job-name=123", "--mem=239", "--cpus-per-task=2", "--tmp=0", "--nice=9990")
-
-	c.Check(sbatchCmd.Args, DeepEquals, expected)
+	c.Check(sbatchArgs(container), DeepEquals, expected)
 }
 
 func (s *MockArvadosServerSuite) TestSbatchPartition(c *C) {
-	theConfig.SbatchArguments = nil
 	container := arvados.Container{
 		UUID:                 "123",
 		RuntimeConstraints:   arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 1},
 		SchedulingParameters: arvados.SchedulingParameters{Partitions: []string{"blurb", "b2"}},
 		Priority:             1}
-	sbatchCmd := sbatchFunc(container)
 
-	var expected []string
-	expected = append(expected, "sbatch")
-	expected = append(expected, "--job-name=123", "--mem=239", "--cpus-per-task=1", "--tmp=0", "--nice=9990", "--partition=blurb,b2")
-
-	c.Check(sbatchCmd.Args, DeepEquals, expected)
+	c.Check(sbatchArgs(container), DeepEquals, []string{
+		"--job-name=123", "--mem=239", "--cpus-per-task=1", "--tmp=0", "--nice=9990",
+		"--partition=blurb,b2",
+	})
 }
 
 func (s *TestSuite) TestIntegrationChangePriority(c *C) {
-	var scontrolCmdLine []string
-	step := 0
-
-	container := s.integrationTest(c,
-		func() *exec.Cmd {
-			if step == 0 {
-				return exec.Command("echo", "zzzzz-dz642-queuedcontainer 9990 100")
-			} else if step == 1 {
-				return exec.Command("echo", "zzzzz-dz642-queuedcontainer 4000 100")
-			} else {
-				return exec.Command("echo")
-			}
-		},
-		func(arvados.Container) *exec.Cmd { return exec.Command("true") },
-		nil,
-		func(container arvados.Container) *exec.Cmd {
-			scontrolCmdLine = scontrolFunc(container).Args
-			step = 1
-			return exec.Command("true")
-		},
-		[]string(nil),
+	slurm := &slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"}
+	container := s.integrationTest(c, slurm, nil,
 		func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
 			dispatcher.UpdateState(container.UUID, dispatch.Running)
-			time.Sleep(1 * time.Second)
+			time.Sleep(time.Second)
 			dispatcher.Arv.Update("containers", container.UUID,
 				arvadosclient.Dict{
 					"container": arvadosclient.Dict{"priority": 600}},
 				nil)
-			time.Sleep(1 * time.Second)
-			step = 2
+			time.Sleep(time.Second)
 			dispatcher.UpdateState(container.UUID, dispatch.Complete)
 		})
 	c.Check(container.State, Equals, arvados.ContainerStateComplete)
-	c.Check(scontrolCmdLine, DeepEquals, []string{"scontrol", "update", "JobName=zzzzz-dz642-queuedcontainer", "Nice=4000"})
+	c.Assert(len(slurm.didRenice), Not(Equals), 0)
+	c.Check(slurm.didRenice[len(slurm.didRenice)-1], DeepEquals, []string{"zzzzz-dz642-queuedcontainer", "4000"})
 }
diff --git a/services/crunch-dispatch-slurm/slurm.go b/services/crunch-dispatch-slurm/slurm.go
new file mode 100644
index 0000000..f675f6c
--- /dev/null
+++ b/services/crunch-dispatch-slurm/slurm.go
@@ -0,0 +1,52 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+	"fmt"
+	"io"
+	"log"
+	"os/exec"
+	"strings"
+)
+
+type Slurm interface {
+	Cancel(name string) error
+	Renice(name string, nice int) error
+	QueueCommand(args []string) *exec.Cmd
+	Batch(script io.Reader, args []string) error
+}
+
+type slurmCLI struct{}
+
+func (scli *slurmCLI) Batch(script io.Reader, args []string) error {
+	return scli.run(script, "sbatch", args)
+}
+
+func (scli *slurmCLI) Cancel(name string) error {
+	return scli.run(nil, "scancel", []string{"--name=" + name})
+}
+
+func (scli *slurmCLI) QueueCommand(args []string) *exec.Cmd {
+	return exec.Command("squeue", args...)
+}
+
+func (scli *slurmCLI) Renice(name string, nice int) error {
+	return scli.run(nil, "scontrol", []string{"update", "JobName=" + name, fmt.Sprintf("Nice=%d", nice)})
+}
+
+func (scli *slurmCLI) run(stdin io.Reader, prog string, args []string) error {
+	cmd := exec.Command(prog, args...)
+	cmd.Stdin = stdin
+	out, err := cmd.CombinedOutput()
+	outTrim := strings.TrimSpace(string(out))
+	if err != nil || len(out) > 0 {
+		log.Printf("%q %q: %q", cmd.Path, cmd.Args, outTrim)
+	}
+	if err != nil {
+		err = fmt.Errorf("%s: %s (%q)", cmd.Path, err, outTrim)
+	}
+	return err
+}
diff --git a/services/crunch-dispatch-slurm/squeue.go b/services/crunch-dispatch-slurm/squeue.go
index 819c2d2..5ecfe8f 100644
--- a/services/crunch-dispatch-slurm/squeue.go
+++ b/services/crunch-dispatch-slurm/squeue.go
@@ -8,7 +8,6 @@ import (
 	"bytes"
 	"fmt"
 	"log"
-	"os/exec"
 	"strings"
 	"sync"
 	"time"
@@ -29,12 +28,6 @@ type SqueueChecker struct {
 	sync.Cond
 }
 
-func squeueFunc() *exec.Cmd {
-	return exec.Command("squeue", "--all", "--format=%j %y %Q")
-}
-
-var squeueCmd = squeueFunc
-
 // HasUUID checks if a given container UUID is in the slurm queue.
 // This does not run squeue directly, but instead blocks until woken
 // up by next successful update of squeue.
@@ -84,7 +77,7 @@ func (sqc *SqueueChecker) check() {
 	sqc.L.Lock()
 	defer sqc.L.Unlock()
 
-	cmd := squeueCmd()
+	cmd := theConfig.slurm.QueueCommand([]string{"--all", "--format=%j %y %Q"})
 	stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{}
 	cmd.Stdout, cmd.Stderr = stdout, stderr
 	if err := cmd.Run(); err != nil {

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list