[ARVADOS] created: 1.2.0-219-g09a692078

Git user git at public.curoverse.com
Thu Oct 18 13:34:36 EDT 2018


        at  09a692078f06f2cded026f0a5b95f71ea706f41f (commit)


commit 09a692078f06f2cded026f0a5b95f71ea706f41f
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Thu Oct 18 13:28:05 2018 -0400

    14328: Cancel if container ends but ContainerWait does not return.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/services/crunch-run/cmd_watchdog.go b/services/crunch-run/cmd_watchdog.go
new file mode 100644
index 000000000..79f038bda
--- /dev/null
+++ b/services/crunch-run/cmd_watchdog.go
@@ -0,0 +1,76 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+	"bytes"
+	"os/exec"
+	"strings"
+	"time"
+)
+
+const (
+	dockerPSGracePeriod  = time.Minute
+	dockerPSPollInterval = 5 * time.Second
+)
+
+// dockerPSWatchdog returns a new channel, which closes if the given
+// Docker container disappears from `docker ps` after appearing there
+// at least once.
+func dockerPSWatchdog(cid string, logf func(string, ...interface{})) <-chan struct{} {
+	return cmdWatchdog(cid, dockerPSPollInterval, dockerPSGracePeriod, logf, "docker", "ps", "-q")
+}
+
+// cmdWatchdog returns a new channel, which closes if the given string
+// disappears from the command's output after appearing there at least
+// once.
+func cmdWatchdog(expect string, tick, grace time.Duration, logf func(string, ...interface{}), prog string, args ...string) <-chan struct{} {
+	ch := make(chan struct{})
+	go func() {
+		defer close(ch)
+		expect := []byte(expect)
+		errLast := false
+		var whenPresent, whenMissing time.Time
+		for range time.NewTicker(tick).C {
+			var stderr bytes.Buffer
+			cmd := exec.Command(prog, args...)
+			cmd.Stderr = &stderr
+			stdout, err := cmd.Output()
+			if err != nil {
+				if !errLast {
+					// Reduce log spam by logging
+					// the error only if the last
+					// attempt succeeded
+					logf("%s %s: %s (%q)", prog, strings.Join(args, " "), err, stderr.Bytes())
+					errLast = true
+				}
+				continue
+			}
+			errLast = false
+
+			found := false
+			for _, got := range bytes.Split(stdout, []byte{'\n'}) {
+				if bytes.Equal(expect, got) {
+					found = true
+					break
+				}
+			}
+			if found {
+				whenPresent = time.Now()
+				whenMissing = time.Time{}
+				continue
+			}
+			if whenMissing.IsZero() {
+				whenMissing = time.Now()
+				continue
+			}
+			if !whenPresent.IsZero() && time.Since(whenMissing) > grace {
+				logf("%q disappeared %d seconds ago according to %s %s", expect, int(time.Since(whenMissing).Seconds()), prog, strings.Join(args, " "))
+				return
+			}
+		}
+	}()
+	return ch
+}
diff --git a/services/crunch-run/cmd_watchdog_test.go b/services/crunch-run/cmd_watchdog_test.go
new file mode 100644
index 000000000..e45957e74
--- /dev/null
+++ b/services/crunch-run/cmd_watchdog_test.go
@@ -0,0 +1,117 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+	"io/ioutil"
+	"os"
+	"path/filepath"
+	"time"
+
+	check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&CmdWatchdogSuite{})
+
+type CmdWatchdogSuite struct{}
+
+func (s *CmdWatchdogSuite) TestAppearThenError(c *check.C) {
+	c.Check(s.test(c, func(tick time.Duration, appear, disappear, fail func()) {
+		time.Sleep(tick)
+		appear()
+		time.Sleep(10 * tick)
+		fail()
+	}), check.Equals, false)
+}
+
+func (s *CmdWatchdogSuite) TestAppearThenDisappear(c *check.C) {
+	c.Check(s.test(c, func(tick time.Duration, appear, disappear, fail func()) {
+		time.Sleep(tick)
+		appear()
+		time.Sleep(10 * tick)
+		disappear()
+	}), check.Equals, true)
+}
+
+func (s *CmdWatchdogSuite) TestAppearThenDisappearThenFail(c *check.C) {
+	c.Check(s.test(c, func(tick time.Duration, appear, disappear, fail func()) {
+		time.Sleep(tick)
+		appear()
+		time.Sleep(10 * tick)
+		disappear()
+		time.Sleep(2 * tick)
+		fail()
+	}), check.Equals, false)
+}
+
+func (s *CmdWatchdogSuite) TestAppearThenDisappearThenFailThenDisappear(c *check.C) {
+	c.Check(s.test(c, func(tick time.Duration, appear, disappear, fail func()) {
+		time.Sleep(tick)
+		appear()
+		time.Sleep(10 * tick)
+		disappear()
+		time.Sleep(5 * tick)
+		fail()
+		time.Sleep(5 * tick)
+		disappear()
+	}), check.Equals, true)
+}
+
+func (s *CmdWatchdogSuite) TestAppearThenFailThenAppear(c *check.C) {
+	c.Check(s.test(c, func(tick time.Duration, appear, disappear, fail func()) {
+		time.Sleep(tick)
+		appear()
+		time.Sleep(5 * tick)
+		fail()
+		time.Sleep(20 * tick)
+		appear()
+	}), check.Equals, false)
+}
+
+func (s *CmdWatchdogSuite) TestNeverAppear(c *check.C) {
+	c.Check(s.test(c, func(tick time.Duration, appear, disappear, fail func()) {
+		disappear()
+	}), check.Equals, false)
+}
+
+func (s *CmdWatchdogSuite) TestNeverSucceed(c *check.C) {
+	c.Check(s.test(c, func(tick time.Duration, appear, disappear, fail func()) {
+		fail()
+	}), check.Equals, false)
+}
+
+// test runs the given updateOutcomes func in a goroutine, sets up a
+// watchdog, and returns true if the watchdog channel closes.
+func (s *CmdWatchdogSuite) test(c *check.C, updateOutcomes func(tick time.Duration, appear, disappear, fail func())) bool {
+	const tick = time.Millisecond
+	dir, err := ioutil.TempDir("", "")
+	c.Assert(err, check.IsNil)
+	defer os.RemoveAll(dir)
+	fnm := filepath.Join(dir, "file")
+	writeFile := func(data string) {
+		tmpf, err := ioutil.TempFile(dir, "")
+		c.Assert(err, check.IsNil)
+		defer os.Remove(tmpf.Name())
+		ioutil.WriteFile(tmpf.Name(), []byte(data), 0755)
+		os.Rename(tmpf.Name(), fnm)
+	}
+	appear := func() {
+		writeFile("foo\nexpectme\nbar\n")
+	}
+	disappear := func() {
+		writeFile("foo\nbar\n")
+	}
+	fail := func() {
+		os.Remove(fnm)
+	}
+	go updateOutcomes(tick, appear, disappear, fail)
+	ch := cmdWatchdog("expectme", tick, 10*tick, c.Logf, "cat", fnm)
+	select {
+	case <-time.After(tick * 75):
+		return false
+	case <-ch:
+		return true
+	}
+}
diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index be98a3ee1..0b97ad286 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -1124,6 +1124,8 @@ func (runner *ContainerRunner) WaitFinish() error {
 		runTimeExceeded = time.After(time.Duration(timeout) * time.Second)
 	}
 
+	containerDisappeared := dockerPSWatchdog(runner.ContainerID, runner.CrunchLog.Printf)
+
 	containerdGone := make(chan error)
 	defer close(containerdGone)
 	if runner.checkContainerd > 0 {
@@ -1171,6 +1173,9 @@ func (runner *ContainerRunner) WaitFinish() error {
 			runner.stop(nil)
 			runTimeExceeded = nil
 
+		case <-containerDisappeared:
+			return errors.New("docker client never returned status")
+
 		case err := <-containerdGone:
 			return err
 		}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list