[ARVADOS] created: 1.1.2-119-g405dea2

Git user git at public.curoverse.com
Thu Jan 18 16:42:45 EST 2018


        at  405dea2598f2e45ecc9337aa462c385cd4e893f0 (commit)


commit 405dea2598f2e45ecc9337aa462c385cd4e893f0
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Thu Jan 18 16:41:39 2018 -0500

    12891: Avoid flapping finalState after cancel.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index a78d47d..45e48c2 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -1139,10 +1139,6 @@ func (runner *ContainerRunner) UploadOutputFile(
 
 // HandleOutput sets the output, unmounts the FUSE mount, and deletes temporary directories
 func (runner *ContainerRunner) CaptureOutput() error {
-	if runner.finalState != "Complete" {
-		return nil
-	}
-
 	if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
 		// Output may have been set directly by the container, so
 		// refresh the container record to check.
@@ -1595,7 +1591,7 @@ func (runner *ContainerRunner) Run() (err error) {
 	}
 
 	err = runner.WaitFinish()
-	if err == nil {
+	if err == nil && !runner.IsCancelled() {
 		runner.finalState = "Complete"
 	}
 	return

commit 81350383aee055b3bd0b6f25e90575b13ffa350f
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Thu Jan 18 16:41:14 2018 -0500

    12891: Fix use of magic string.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index b8c80dd..a78d47d 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -930,7 +930,7 @@ func (runner *ContainerRunner) StartContainer() error {
 func (runner *ContainerRunner) WaitFinish() (err error) {
 	runner.CrunchLog.Print("Waiting for container to finish")
 
-	waitOk, waitErr := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID, "not-running")
+	waitOk, waitErr := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID, dockercontainer.WaitConditionNotRunning)
 
 	go func() {
 		<-runner.ArvMountExit

commit 2f512a373dbb77ec2b170387bfb4ae03c89f2281
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Thu Jan 18 16:40:59 2018 -0500

    12891: Don't give up after trying once to stop the docker container.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index 1bd4956..b8c80dd 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -150,11 +150,10 @@ func (runner *ContainerRunner) setupSignals() {
 	signal.Notify(runner.SigChan, syscall.SIGQUIT)
 
 	go func(sig chan os.Signal) {
-		s := <-sig
-		if s != nil {
-			runner.CrunchLog.Printf("Caught signal %v", s)
+		for s := range sig {
+			runner.CrunchLog.Printf("caught signal: %v", s)
+			runner.stop()
 		}
-		runner.stop()
 	}(runner.SigChan)
 }
 
@@ -162,25 +161,21 @@ func (runner *ContainerRunner) setupSignals() {
 func (runner *ContainerRunner) stop() {
 	runner.cStateLock.Lock()
 	defer runner.cStateLock.Unlock()
-	if runner.cCancelled {
+	if !runner.cStarted {
 		return
 	}
 	runner.cCancelled = true
-	if runner.cStarted {
-		timeout := time.Duration(10)
-		err := runner.Docker.ContainerStop(context.TODO(), runner.ContainerID, &(timeout))
-		if err != nil {
-			runner.CrunchLog.Printf("StopContainer failed: %s", err)
-		}
-		// Suppress multiple calls to stop()
-		runner.cStarted = false
+	runner.CrunchLog.Printf("stopping container")
+	timeout := 10 * time.Second
+	err := runner.Docker.ContainerStop(context.TODO(), runner.ContainerID, &timeout)
+	if err != nil {
+		runner.CrunchLog.Printf("error stopping container: %s", err)
 	}
 }
 
 func (runner *ContainerRunner) stopSignals() {
 	if runner.SigChan != nil {
 		signal.Stop(runner.SigChan)
-		close(runner.SigChan)
 	}
 }
 

commit c5455d7940c7838551d64f0272eb6f6b1ec04dd9
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Thu Jan 18 16:39:16 2018 -0500

    12891: Don't use SIGKILL when telling crunch-run to cancel.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/services/crunch-dispatch-slurm/slurm.go b/services/crunch-dispatch-slurm/slurm.go
index f675f6c..067c9b8 100644
--- a/services/crunch-dispatch-slurm/slurm.go
+++ b/services/crunch-dispatch-slurm/slurm.go
@@ -26,7 +26,28 @@ func (scli *slurmCLI) Batch(script io.Reader, args []string) error {
 }
 
 func (scli *slurmCLI) Cancel(name string) error {
-	return scli.run(nil, "scancel", []string{"--name=" + name})
+	for _, args := range [][]string{
+		// If the slurm job hasn't started yet, remove it from
+		// the queue.
+		{"--state=pending"},
+		// If the slurm job has started, send SIGTERM. If we
+		// cancel a running job without a --signal argument,
+		// slurm will send SIGTERM and then (after some
+		// site-configured interval) SIGKILL. This would kill
+		// crunch-run without stopping the container, which we
+		// don't want.
+		{"--state=running", "--signal=TERM"},
+		{"--state=suspended", "--signal=TERM"},
+	} {
+		err := scli.run(nil, "scancel", append([]string{"--name=" + name}, args...))
+		if err != nil {
+			// Note: an error here can't mean "no such
+			// job": `scancel --name=x state=y ...`
+			// succeeds with no output if no job matches.
+			return err
+		}
+	}
+	return nil
 }
 
 func (scli *slurmCLI) QueueCommand(args []string) *exec.Cmd {

commit 6846bec056e246cd68a2881b7b0edffe0bea24b6
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Thu Jan 18 16:28:39 2018 -0500

    12891: Refactor slurm commands.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index 3c89103..aec4cc3 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -7,14 +7,12 @@ package main
 // Dispatcher service for Crunch that submits containers to the slurm queue.
 
 import (
-	"bytes"
 	"context"
 	"flag"
 	"fmt"
 	"log"
 	"math"
 	"os"
-	"os/exec"
 	"regexp"
 	"strings"
 	"time"
@@ -43,9 +41,12 @@ type Config struct {
 
 	// Minimum time between two attempts to run the same container
 	MinRetryPeriod arvados.Duration
+
+	slurm Slurm
 }
 
 func main() {
+	theConfig.slurm = &slurmCLI{}
 	err := doMain()
 	if err != nil {
 		log.Fatal(err)
@@ -175,8 +176,7 @@ func niceness(priority int) int {
 	return (1000 - priority) * 10
 }
 
-// sbatchCmd
-func sbatchFunc(container arvados.Container) *exec.Cmd {
+func sbatchArgs(container arvados.Container) []string {
 	mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+container.RuntimeConstraints.KeepCacheRAM) / float64(1048576)))
 
 	var disk int64
@@ -198,61 +198,22 @@ func sbatchFunc(container arvados.Container) *exec.Cmd {
 		sbatchArgs = append(sbatchArgs, fmt.Sprintf("--partition=%s", strings.Join(container.SchedulingParameters.Partitions, ",")))
 	}
 
-	return exec.Command("sbatch", sbatchArgs...)
-}
-
-// scancelCmd
-func scancelFunc(container arvados.Container) *exec.Cmd {
-	return exec.Command("scancel", "--name="+container.UUID)
-}
-
-// scontrolCmd
-func scontrolFunc(container arvados.Container) *exec.Cmd {
-	return exec.Command("scontrol", "update", "JobName="+container.UUID, fmt.Sprintf("Nice=%d", niceness(container.Priority)))
+	return sbatchArgs
 }
 
-// Wrap these so that they can be overridden by tests
-var sbatchCmd = sbatchFunc
-var scancelCmd = scancelFunc
-var scontrolCmd = scontrolFunc
-
-// Submit job to slurm using sbatch.
 func submit(dispatcher *dispatch.Dispatcher, container arvados.Container, crunchRunCommand []string) error {
-	cmd := sbatchCmd(container)
-
-	// Send a tiny script on stdin to execute the crunch-run
-	// command (slurm requires this to be a #! script)
-
 	// append() here avoids modifying crunchRunCommand's
 	// underlying array, which is shared with other goroutines.
-	args := append([]string(nil), crunchRunCommand...)
-	args = append(args, container.UUID)
-	cmd.Stdin = strings.NewReader(execScript(args))
-
-	var stdout, stderr bytes.Buffer
-	cmd.Stdout = &stdout
-	cmd.Stderr = &stderr
+	crArgs := append([]string(nil), crunchRunCommand...)
+	crArgs = append(crArgs, container.UUID)
+	crScript := strings.NewReader(execScript(crArgs))
 
-	// Mutex between squeue sync and running sbatch or scancel.
 	sqCheck.L.Lock()
 	defer sqCheck.L.Unlock()
 
-	log.Printf("exec sbatch %+q", cmd.Args)
-	err := cmd.Run()
-
-	switch err.(type) {
-	case nil:
-		log.Printf("sbatch succeeded: %q", strings.TrimSpace(stdout.String()))
-		return nil
-
-	case *exec.ExitError:
-		dispatcher.Unlock(container.UUID)
-		return fmt.Errorf("sbatch %+q failed: %v (stderr: %q)", cmd.Args, err, stderr.Bytes())
-
-	default:
-		dispatcher.Unlock(container.UUID)
-		return fmt.Errorf("exec failed: %v", err)
-	}
+	sbArgs := sbatchArgs(container)
+	log.Printf("running sbatch %+q", sbArgs)
+	return theConfig.slurm.Batch(crScript, sbArgs)
 }
 
 // Submit a container to the slurm queue (or resume monitoring if it's
@@ -313,10 +274,9 @@ func run(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados
 			} else if updated.Priority == 0 {
 				log.Printf("Container %s has state %q, priority %d: cancel slurm job", ctr.UUID, updated.State, updated.Priority)
 				scancel(ctr)
-			} else if niceness(updated.Priority) != sqCheck.GetNiceness(ctr.UUID) && sqCheck.GetNiceness(ctr.UUID) != -1 {
-				// dynamically adjust priority
-				log.Printf("Container priority %v != %v", niceness(updated.Priority), sqCheck.GetNiceness(ctr.UUID))
-				scontrolUpdate(updated)
+			} else {
+				renice(updated)
+
 			}
 		}
 	}
@@ -324,12 +284,11 @@ func run(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados
 
 func scancel(ctr arvados.Container) {
 	sqCheck.L.Lock()
-	cmd := scancelCmd(ctr)
-	msg, err := cmd.CombinedOutput()
+	err := theConfig.slurm.Cancel(ctr.UUID)
 	sqCheck.L.Unlock()
 
 	if err != nil {
-		log.Printf("%q %q: %s %q", cmd.Path, cmd.Args, err, msg)
+		log.Printf("scancel: %s", err)
 		time.Sleep(time.Second)
 	} else if sqCheck.HasUUID(ctr.UUID) {
 		log.Printf("container %s is still in squeue after scancel", ctr.UUID)
@@ -337,17 +296,24 @@ func scancel(ctr arvados.Container) {
 	}
 }
 
-func scontrolUpdate(ctr arvados.Container) {
+func renice(ctr arvados.Container) {
+	nice := niceness(ctr.Priority)
+	oldnice := sqCheck.GetNiceness(ctr.UUID)
+	if nice == oldnice || oldnice == -1 {
+		return
+	}
+	log.Printf("updating slurm nice value to %d (was %d)", nice, oldnice)
 	sqCheck.L.Lock()
-	cmd := scontrolCmd(ctr)
-	msg, err := cmd.CombinedOutput()
+	err := theConfig.slurm.Renice(ctr.UUID, nice)
 	sqCheck.L.Unlock()
 
 	if err != nil {
-		log.Printf("%q %q: %s %q", cmd.Path, cmd.Args, err, msg)
+		log.Printf("renice: %s", err)
 		time.Sleep(time.Second)
-	} else if sqCheck.HasUUID(ctr.UUID) {
-		log.Printf("Container %s priority is now %v, niceness is now %v",
+		return
+	}
+	if sqCheck.HasUUID(ctr.UUID) {
+		log.Printf("container %s has arvados priority %d, slurm nice %d",
 			ctr.UUID, ctr.Priority, sqCheck.GetNiceness(ctr.UUID))
 	}
 }
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
index a823755..830976d 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
@@ -7,6 +7,7 @@ package main
 import (
 	"bytes"
 	"context"
+	"errors"
 	"fmt"
 	"io"
 	"io/ioutil"
@@ -64,51 +65,55 @@ func (s *MockArvadosServerSuite) TearDownTest(c *C) {
 	arvadostest.ResetEnv()
 }
 
-func (s *TestSuite) integrationTest(c *C,
-	newSqueueCmd func() *exec.Cmd,
-	newScancelCmd func(arvados.Container) *exec.Cmd,
-	newSbatchCmd func(arvados.Container) *exec.Cmd,
-	newScontrolCmd func(arvados.Container) *exec.Cmd,
-	sbatchCmdComps []string,
-	runContainer func(*dispatch.Dispatcher, arvados.Container)) arvados.Container {
-	arvadostest.ResetEnv()
+type slurmFake struct {
+	didBatch  [][]string
+	didCancel []string
+	didRenice [][]string
+	queue     string
+	// If non-nil, run this func during the 2nd+ call to Cancel()
+	onCancel func()
+	// Error returned by Batch()
+	errBatch error
+}
 
-	arv, err := arvadosclient.MakeArvadosClient()
-	c.Assert(err, IsNil)
+func (sf *slurmFake) Batch(script io.Reader, args []string) error {
+	sf.didBatch = append(sf.didBatch, args)
+	return sf.errBatch
+}
 
-	var sbatchCmdLine []string
+func (sf *slurmFake) QueueCommand(args []string) *exec.Cmd {
+	return exec.Command("echo", sf.queue)
+}
 
-	// Override sbatchCmd
-	defer func(orig func(arvados.Container) *exec.Cmd) {
-		sbatchCmd = orig
-	}(sbatchCmd)
+func (sf *slurmFake) Renice(name string, nice int) error {
+	sf.didRenice = append(sf.didRenice, []string{name, fmt.Sprintf("%d", nice)})
+	return nil
+}
 
-	if newSbatchCmd != nil {
-		sbatchCmd = newSbatchCmd
-	} else {
-		sbatchCmd = func(container arvados.Container) *exec.Cmd {
-			sbatchCmdLine = sbatchFunc(container).Args
-			return exec.Command("sh")
-		}
+func (sf *slurmFake) Cancel(name string) error {
+	sf.didCancel = append(sf.didCancel, name)
+	if len(sf.didCancel) == 1 {
+		// simulate error on first attempt
+		return errors.New("something terrible happened")
+	}
+	if sf.onCancel != nil {
+		sf.onCancel()
 	}
+	return nil
+}
 
-	// Override squeueCmd
-	defer func(orig func() *exec.Cmd) {
-		squeueCmd = orig
-	}(squeueCmd)
-	squeueCmd = newSqueueCmd
+func (s *TestSuite) integrationTest(c *C, slurm *slurmFake,
+	expectBatch [][]string,
+	runContainer func(*dispatch.Dispatcher, arvados.Container)) arvados.Container {
+	arvadostest.ResetEnv()
 
-	// Override scancel
-	defer func(orig func(arvados.Container) *exec.Cmd) {
-		scancelCmd = orig
-	}(scancelCmd)
-	scancelCmd = newScancelCmd
+	arv, err := arvadosclient.MakeArvadosClient()
+	c.Assert(err, IsNil)
 
-	// Override scontrol
-	defer func(orig func(arvados.Container) *exec.Cmd) {
-		scontrolCmd = orig
-	}(scontrolCmd)
-	scontrolCmd = newScontrolCmd
+	defer func(orig Slurm) {
+		theConfig.slurm = orig
+	}(theConfig.slurm)
+	theConfig.slurm = slurm
 
 	// There should be one queued container
 	params := arvadosclient.Dict{
@@ -130,6 +135,7 @@ func (s *TestSuite) integrationTest(c *C,
 		RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
 			go func() {
 				runContainer(disp, ctr)
+				slurm.queue = ""
 				doneRun <- struct{}{}
 			}()
 			run(disp, ctr, status)
@@ -145,7 +151,7 @@ func (s *TestSuite) integrationTest(c *C,
 
 	sqCheck.Stop()
 
-	c.Check(sbatchCmdLine, DeepEquals, sbatchCmdComps)
+	c.Check(slurm.didBatch, DeepEquals, expectBatch)
 
 	// There should be no queued containers now
 	err = arv.List("containers", params, &containers)
@@ -160,77 +166,47 @@ func (s *TestSuite) integrationTest(c *C,
 }
 
 func (s *TestSuite) TestIntegrationNormal(c *C) {
-	done := false
 	container := s.integrationTest(c,
-		func() *exec.Cmd {
-			if done {
-				return exec.Command("true")
-			} else {
-				return exec.Command("echo", "zzzzz-dz642-queuedcontainer 9990 100")
-			}
-		},
-		nil,
+		&slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"},
 		nil,
-		nil,
-		[]string(nil),
 		func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
 			dispatcher.UpdateState(container.UUID, dispatch.Running)
 			time.Sleep(3 * time.Second)
 			dispatcher.UpdateState(container.UUID, dispatch.Complete)
-			done = true
 		})
 	c.Check(container.State, Equals, arvados.ContainerStateComplete)
 }
 
 func (s *TestSuite) TestIntegrationCancel(c *C) {
-	var cmd *exec.Cmd
-	var scancelCmdLine []string
-	attempt := 0
-
+	slurm := &slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"}
+	readyToCancel := make(chan bool)
+	slurm.onCancel = func() { <-readyToCancel }
 	container := s.integrationTest(c,
-		func() *exec.Cmd {
-			if cmd != nil && cmd.ProcessState != nil {
-				return exec.Command("true")
-			} else {
-				return exec.Command("echo", "zzzzz-dz642-queuedcontainer 9990 100")
-			}
-		},
-		func(container arvados.Container) *exec.Cmd {
-			if attempt++; attempt == 1 {
-				return exec.Command("false")
-			} else {
-				scancelCmdLine = scancelFunc(container).Args
-				cmd = exec.Command("echo")
-				return cmd
-			}
-		},
-		nil,
+		slurm,
 		nil,
-		[]string(nil),
 		func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
 			dispatcher.UpdateState(container.UUID, dispatch.Running)
-			time.Sleep(1 * time.Second)
+			time.Sleep(time.Second)
 			dispatcher.Arv.Update("containers", container.UUID,
 				arvadosclient.Dict{
 					"container": arvadosclient.Dict{"priority": 0}},
 				nil)
+			readyToCancel <- true
+			close(readyToCancel)
 		})
 	c.Check(container.State, Equals, arvados.ContainerStateCancelled)
-	c.Check(scancelCmdLine, DeepEquals, []string{"scancel", "--name=zzzzz-dz642-queuedcontainer"})
+	c.Check(len(slurm.didCancel) > 1, Equals, true)
+	c.Check(slurm.didCancel[:2], DeepEquals, []string{"zzzzz-dz642-queuedcontainer", "zzzzz-dz642-queuedcontainer"})
 }
 
 func (s *TestSuite) TestIntegrationMissingFromSqueue(c *C) {
-	container := s.integrationTest(c,
-		func() *exec.Cmd { return exec.Command("echo") },
-		nil,
-		nil,
-		nil,
-		[]string{"sbatch",
+	container := s.integrationTest(c, &slurmFake{},
+		[][]string{{
 			fmt.Sprintf("--job-name=%s", "zzzzz-dz642-queuedcontainer"),
 			fmt.Sprintf("--mem=%d", 11445),
 			fmt.Sprintf("--cpus-per-task=%d", 4),
 			fmt.Sprintf("--tmp=%d", 45777),
-			fmt.Sprintf("--nice=%d", 9990)},
+			fmt.Sprintf("--nice=%d", 9990)}},
 		func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
 			dispatcher.UpdateState(container.UUID, dispatch.Running)
 			time.Sleep(3 * time.Second)
@@ -241,13 +217,8 @@ func (s *TestSuite) TestIntegrationMissingFromSqueue(c *C) {
 
 func (s *TestSuite) TestSbatchFail(c *C) {
 	container := s.integrationTest(c,
-		func() *exec.Cmd { return exec.Command("echo") },
-		nil,
-		func(container arvados.Container) *exec.Cmd {
-			return exec.Command("false")
-		},
-		nil,
-		[]string(nil),
+		&slurmFake{errBatch: errors.New("something terrible happened")},
+		[][]string{{"--job-name=zzzzz-dz642-queuedcontainer", "--mem=11445", "--cpus-per-task=4", "--tmp=45777", "--nice=9990"}},
 		func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
 			dispatcher.UpdateState(container.UUID, dispatch.Running)
 			dispatcher.UpdateState(container.UUID, dispatch.Complete)
@@ -387,71 +358,47 @@ func (s *MockArvadosServerSuite) TestSbatchFuncWithConfigArgs(c *C) {
 }
 
 func testSbatchFuncWithArgs(c *C, args []string) {
+	defer func() { theConfig.SbatchArguments = nil }()
 	theConfig.SbatchArguments = append(theConfig.SbatchArguments, args...)
 
 	container := arvados.Container{
 		UUID:               "123",
 		RuntimeConstraints: arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 2},
 		Priority:           1}
-	sbatchCmd := sbatchFunc(container)
 
 	var expected []string
-	expected = append(expected, "sbatch")
 	expected = append(expected, theConfig.SbatchArguments...)
 	expected = append(expected, "--job-name=123", "--mem=239", "--cpus-per-task=2", "--tmp=0", "--nice=9990")
-
-	c.Check(sbatchCmd.Args, DeepEquals, expected)
+	c.Check(sbatchArgs(container), DeepEquals, expected)
 }
 
 func (s *MockArvadosServerSuite) TestSbatchPartition(c *C) {
-	theConfig.SbatchArguments = nil
 	container := arvados.Container{
 		UUID:                 "123",
 		RuntimeConstraints:   arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 1},
 		SchedulingParameters: arvados.SchedulingParameters{Partitions: []string{"blurb", "b2"}},
 		Priority:             1}
-	sbatchCmd := sbatchFunc(container)
 
-	var expected []string
-	expected = append(expected, "sbatch")
-	expected = append(expected, "--job-name=123", "--mem=239", "--cpus-per-task=1", "--tmp=0", "--nice=9990", "--partition=blurb,b2")
-
-	c.Check(sbatchCmd.Args, DeepEquals, expected)
+	c.Check(sbatchArgs(container), DeepEquals, []string{
+		"--job-name=123", "--mem=239", "--cpus-per-task=1", "--tmp=0", "--nice=9990",
+		"--partition=blurb,b2",
+	})
 }
 
 func (s *TestSuite) TestIntegrationChangePriority(c *C) {
-	var scontrolCmdLine []string
-	step := 0
-
-	container := s.integrationTest(c,
-		func() *exec.Cmd {
-			if step == 0 {
-				return exec.Command("echo", "zzzzz-dz642-queuedcontainer 9990 100")
-			} else if step == 1 {
-				return exec.Command("echo", "zzzzz-dz642-queuedcontainer 4000 100")
-			} else {
-				return exec.Command("echo")
-			}
-		},
-		func(arvados.Container) *exec.Cmd { return exec.Command("true") },
-		nil,
-		func(container arvados.Container) *exec.Cmd {
-			scontrolCmdLine = scontrolFunc(container).Args
-			step = 1
-			return exec.Command("true")
-		},
-		[]string(nil),
+	slurm := &slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"}
+	container := s.integrationTest(c, slurm, nil,
 		func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
 			dispatcher.UpdateState(container.UUID, dispatch.Running)
-			time.Sleep(1 * time.Second)
+			time.Sleep(time.Second)
 			dispatcher.Arv.Update("containers", container.UUID,
 				arvadosclient.Dict{
 					"container": arvadosclient.Dict{"priority": 600}},
 				nil)
-			time.Sleep(1 * time.Second)
-			step = 2
+			time.Sleep(time.Second)
 			dispatcher.UpdateState(container.UUID, dispatch.Complete)
 		})
 	c.Check(container.State, Equals, arvados.ContainerStateComplete)
-	c.Check(scontrolCmdLine, DeepEquals, []string{"scontrol", "update", "JobName=zzzzz-dz642-queuedcontainer", "Nice=4000"})
+	c.Assert(len(slurm.didRenice), Not(Equals), 0)
+	c.Check(slurm.didRenice[len(slurm.didRenice)-1], DeepEquals, []string{"zzzzz-dz642-queuedcontainer", "4000"})
 }
diff --git a/services/crunch-dispatch-slurm/slurm.go b/services/crunch-dispatch-slurm/slurm.go
new file mode 100644
index 0000000..f675f6c
--- /dev/null
+++ b/services/crunch-dispatch-slurm/slurm.go
@@ -0,0 +1,52 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+	"fmt"
+	"io"
+	"log"
+	"os/exec"
+	"strings"
+)
+
+type Slurm interface {
+	Cancel(name string) error
+	Renice(name string, nice int) error
+	QueueCommand(args []string) *exec.Cmd
+	Batch(script io.Reader, args []string) error
+}
+
+type slurmCLI struct{}
+
+func (scli *slurmCLI) Batch(script io.Reader, args []string) error {
+	return scli.run(script, "sbatch", args)
+}
+
+func (scli *slurmCLI) Cancel(name string) error {
+	return scli.run(nil, "scancel", []string{"--name=" + name})
+}
+
+func (scli *slurmCLI) QueueCommand(args []string) *exec.Cmd {
+	return exec.Command("squeue", args...)
+}
+
+func (scli *slurmCLI) Renice(name string, nice int) error {
+	return scli.run(nil, "scontrol", []string{"update", "JobName=" + name, fmt.Sprintf("Nice=%d", nice)})
+}
+
+func (scli *slurmCLI) run(stdin io.Reader, prog string, args []string) error {
+	cmd := exec.Command(prog, args...)
+	cmd.Stdin = stdin
+	out, err := cmd.CombinedOutput()
+	outTrim := strings.TrimSpace(string(out))
+	if err != nil || len(out) > 0 {
+		log.Printf("%q %q: %q", cmd.Path, cmd.Args, outTrim)
+	}
+	if err != nil {
+		err = fmt.Errorf("%s: %s (%q)", cmd.Path, err, outTrim)
+	}
+	return err
+}
diff --git a/services/crunch-dispatch-slurm/squeue.go b/services/crunch-dispatch-slurm/squeue.go
index 819c2d2..5ecfe8f 100644
--- a/services/crunch-dispatch-slurm/squeue.go
+++ b/services/crunch-dispatch-slurm/squeue.go
@@ -8,7 +8,6 @@ import (
 	"bytes"
 	"fmt"
 	"log"
-	"os/exec"
 	"strings"
 	"sync"
 	"time"
@@ -29,12 +28,6 @@ type SqueueChecker struct {
 	sync.Cond
 }
 
-func squeueFunc() *exec.Cmd {
-	return exec.Command("squeue", "--all", "--format=%j %y %Q")
-}
-
-var squeueCmd = squeueFunc
-
 // HasUUID checks if a given container UUID is in the slurm queue.
 // This does not run squeue directly, but instead blocks until woken
 // up by next successful update of squeue.
@@ -84,7 +77,7 @@ func (sqc *SqueueChecker) check() {
 	sqc.L.Lock()
 	defer sqc.L.Unlock()
 
-	cmd := squeueCmd()
+	cmd := theConfig.slurm.QueueCommand([]string{"--all", "--format=%j %y %Q"})
 	stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{}
 	cmd.Stdout, cmd.Stderr = stdout, stderr
 	if err := cmd.Run(); err != nil {

commit 31007a99c336423f2b34a306028ec0aa41b1dd3a
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Wed Jan 17 17:21:18 2018 -0500

    12891: Remove superfluous ThinDockerClientProxy wrapper.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index b480c06..1bd4956 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -6,7 +6,6 @@ package main
 
 import (
 	"bytes"
-	"context"
 	"encoding/json"
 	"errors"
 	"flag"
@@ -33,6 +32,7 @@ import (
 	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
 	"git.curoverse.com/arvados.git/sdk/go/keepclient"
 	"git.curoverse.com/arvados.git/sdk/go/manifest"
+	"golang.org/x/net/context"
 
 	dockertypes "github.com/docker/docker/api/types"
 	dockercontainer "github.com/docker/docker/api/types/container"
@@ -82,53 +82,6 @@ type ThinDockerClient interface {
 	ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error)
 }
 
-// ThinDockerClientProxy is a proxy implementation of ThinDockerClient
-// that executes the docker requests on dockerclient.Client
-type ThinDockerClientProxy struct {
-	Docker *dockerclient.Client
-}
-
-// ContainerAttach invokes dockerclient.Client.ContainerAttach
-func (proxy ThinDockerClientProxy) ContainerAttach(ctx context.Context, container string, options dockertypes.ContainerAttachOptions) (dockertypes.HijackedResponse, error) {
-	return proxy.Docker.ContainerAttach(ctx, container, options)
-}
-
-// ContainerCreate invokes dockerclient.Client.ContainerCreate
-func (proxy ThinDockerClientProxy) ContainerCreate(ctx context.Context, config *dockercontainer.Config, hostConfig *dockercontainer.HostConfig,
-	networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error) {
-	return proxy.Docker.ContainerCreate(ctx, config, hostConfig, networkingConfig, containerName)
-}
-
-// ContainerStart invokes dockerclient.Client.ContainerStart
-func (proxy ThinDockerClientProxy) ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error {
-	return proxy.Docker.ContainerStart(ctx, container, options)
-}
-
-// ContainerStop invokes dockerclient.Client.ContainerStop
-func (proxy ThinDockerClientProxy) ContainerStop(ctx context.Context, container string, timeout *time.Duration) error {
-	return proxy.Docker.ContainerStop(ctx, container, timeout)
-}
-
-// ContainerWait invokes dockerclient.Client.ContainerWait
-func (proxy ThinDockerClientProxy) ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error) {
-	return proxy.Docker.ContainerWait(ctx, container, condition)
-}
-
-// ImageInspectWithRaw invokes dockerclient.Client.ImageInspectWithRaw
-func (proxy ThinDockerClientProxy) ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error) {
-	return proxy.Docker.ImageInspectWithRaw(ctx, image)
-}
-
-// ImageLoad invokes dockerclient.Client.ImageLoad
-func (proxy ThinDockerClientProxy) ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error) {
-	return proxy.Docker.ImageLoad(ctx, input, quiet)
-}
-
-// ImageRemove invokes dockerclient.Client.ImageRemove
-func (proxy ThinDockerClientProxy) ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error) {
-	return proxy.Docker.ImageRemove(ctx, image, options)
-}
-
 // ContainerRunner is the main stateful struct used for a single execution of a
 // container.
 type ContainerRunner struct {
@@ -1739,10 +1692,8 @@ func main() {
 	// API version 1.21 corresponds to Docker 1.9, which is currently the
 	// minimum version we want to support.
 	docker, dockererr := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
-	dockerClientProxy := ThinDockerClientProxy{Docker: docker}
-
-	cr := NewContainerRunner(api, kc, dockerClientProxy, containerId)
 
+	cr := NewContainerRunner(api, kc, docker, containerId)
 	if dockererr != nil {
 		cr.CrunchLog.Printf("%s: %v", containerId, dockererr)
 		cr.checkBrokenNode(dockererr)
diff --git a/services/crunch-run/crunchrun_test.go b/services/crunch-run/crunchrun_test.go
index 4979cf8..a524e48 100644
--- a/services/crunch-run/crunchrun_test.go
+++ b/services/crunch-run/crunchrun_test.go
@@ -7,7 +7,6 @@ package main
 import (
 	"bufio"
 	"bytes"
-	"context"
 	"crypto/md5"
 	"encoding/json"
 	"errors"
@@ -30,6 +29,7 @@ import (
 	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
 	"git.curoverse.com/arvados.git/sdk/go/arvadostest"
 	"git.curoverse.com/arvados.git/sdk/go/manifest"
+	"golang.org/x/net/context"
 
 	dockertypes "github.com/docker/docker/api/types"
 	dockercontainer "github.com/docker/docker/api/types/container"

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list