[ARVADOS] updated: c106221bf8c90fd80ca2c19a578fd7faec985229
Git user
git at public.curoverse.com
Sat Feb 25 02:03:53 EST 2017
Summary of changes:
services/crunch-run/crunchrun.go | 10 +++---
services/crunch-run/crunchrun_test.go | 65 ++++++++++++++++++++++-------------
services/crunch-run/logging.go | 38 +++++++++++---------
3 files changed, 69 insertions(+), 44 deletions(-)
discards 1042f200521bed5c9bb658d24afc442d552a76fa (commit)
via c106221bf8c90fd80ca2c19a578fd7faec985229 (commit)
via 2ac112e2364b009166e0f67a3455fffb3cccc16c (commit)
This update added new revisions after undoing existing revisions. That is
to say, the old revision is not a strict subset of the new revision. This
situation occurs when you --force push a change and generate a repository
containing something like this:
* -- * -- B -- O -- O -- O (1042f200521bed5c9bb658d24afc442d552a76fa)
\
N -- N -- N (c106221bf8c90fd80ca2c19a578fd7faec985229)
When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit c106221bf8c90fd80ca2c19a578fd7faec985229
Author: Tom Clegg <tom at curoverse.com>
Date: Sat Feb 25 01:56:53 2017 -0500
10777: Close and flush logs right away instead of waiting for next tick.
diff --git a/services/crunch-run/logging.go b/services/crunch-run/logging.go
index 4f8f95c..5254ff6 100644
--- a/services/crunch-run/logging.go
+++ b/services/crunch-run/logging.go
@@ -4,11 +4,12 @@ import (
"bufio"
"bytes"
"fmt"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"io"
"log"
"sync"
"time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
)
// Timestamper is the signature for a function that takes a timestamp and
@@ -31,9 +32,9 @@ type ThrottledLogger struct {
*log.Logger
buf *bytes.Buffer
sync.Mutex
- writer io.WriteCloser
- stop bool
- flusherDone chan bool
+ writer io.WriteCloser
+ stopping chan struct{}
+ stopped chan struct{}
Timestamper
Immediate *log.Logger
}
@@ -80,10 +81,13 @@ func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
func (tl *ThrottledLogger) flusher() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
- for range ticker.C {
- // We use a separate "stopping" var here to ensure we flush
- // tl.buf after tl.stop becomes true.
- stopping := tl.stop
+ for stopping := false; !stopping; {
+ select {
+ case <-tl.stopping:
+ // flush tl.buf, then exit the loop
+ stopping = true
+ case <-ticker.C:
+ }
var ready *bytes.Buffer
@@ -94,19 +98,20 @@ func (tl *ThrottledLogger) flusher() {
if ready != nil && ready.Len() > 0 {
tl.writer.Write(ready.Bytes())
}
-
- if stopping {
- break
- }
}
- close(tl.flusherDone)
+ close(tl.stopped)
}
// Close the flusher goroutine and wait for it to complete, then close the
// underlying Writer.
func (tl *ThrottledLogger) Close() error {
- tl.stop = true
- <-tl.flusherDone
+ select {
+ case <-tl.stopping:
+ // already stopped
+ default:
+ close(tl.stopping)
+ }
+ <-tl.stopped
return tl.writer.Close()
}
@@ -154,7 +159,8 @@ func ReadWriteLines(in io.Reader, writer io.Writer, done chan<- bool) {
// per second.
func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
tl := &ThrottledLogger{}
- tl.flusherDone = make(chan bool)
+ tl.stopping = make(chan struct{})
+ tl.stopped = make(chan struct{})
tl.writer = writer
tl.Logger = log.New(tl, "", 0)
tl.Timestamper = RFC3339Timestamp
commit 2ac112e2364b009166e0f67a3455fffb3cccc16c
Author: Tom Clegg <tom at curoverse.com>
Date: Sat Feb 25 01:51:20 2017 -0500
10777: Stop container if arv-mount dies before container exits.
diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index 0b59f7d..4709967 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -5,12 +5,6 @@ import (
"errors"
"flag"
"fmt"
- "git.curoverse.com/arvados.git/lib/crunchstat"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/sdk/go/keepclient"
- "git.curoverse.com/arvados.git/sdk/go/manifest"
- "github.com/curoverse/dockerclient"
"io"
"io/ioutil"
"log"
@@ -24,6 +18,13 @@ import (
"sync"
"syscall"
"time"
+
+ "git.curoverse.com/arvados.git/lib/crunchstat"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "git.curoverse.com/arvados.git/sdk/go/manifest"
+ "github.com/curoverse/dockerclient"
)
// IArvadosClient is the minimal Arvados API methods used by crunch-run.
@@ -123,20 +124,29 @@ func (runner *ContainerRunner) SetupSignals() {
signal.Notify(runner.SigChan, syscall.SIGINT)
signal.Notify(runner.SigChan, syscall.SIGQUIT)
- go func(sig <-chan os.Signal) {
- for range sig {
- if !runner.Cancelled {
- runner.CancelLock.Lock()
- runner.Cancelled = true
- if runner.ContainerID != "" {
- runner.Docker.StopContainer(runner.ContainerID, 10)
- }
- runner.CancelLock.Unlock()
- }
- }
+ go func(sig chan os.Signal) {
+ <-sig
+ runner.stop()
+ signal.Stop(sig)
}(runner.SigChan)
}
+// stop the underlying Docker container.
+func (runner *ContainerRunner) stop() {
+ runner.CancelLock.Lock()
+ defer runner.CancelLock.Unlock()
+ if runner.Cancelled {
+ return
+ }
+ runner.Cancelled = true
+ if runner.ContainerID != "" {
+ err := runner.Docker.StopContainer(runner.ContainerID, 10)
+ if err != nil {
+ log.Printf("StopContainer failed: %s", err)
+ }
+ }
+}
+
// LoadImage determines the docker image id from the container record and
// checks if it is available in the local Docker image store. If not, it loads
// the image from Keep.
@@ -600,12 +610,22 @@ func (runner *ContainerRunner) StartContainer() error {
func (runner *ContainerRunner) WaitFinish() error {
runner.CrunchLog.Print("Waiting for container to finish")
- result := runner.Docker.Wait(runner.ContainerID)
- wr := <-result
- if wr.Error != nil {
- return fmt.Errorf("While waiting for container to finish: %v", wr.Error)
+ waitDocker := runner.Docker.Wait(runner.ContainerID)
+ waitMount := runner.ArvMountExit
+ for waitDocker != nil {
+ select {
+ case err := <-waitMount:
+ runner.CrunchLog.Printf("arv-mount exited before container finished: %v", err)
+ waitMount = nil
+ runner.stop()
+ case wr := <-waitDocker:
+ if wr.Error != nil {
+ return fmt.Errorf("While waiting for container to finish: %v", wr.Error)
+ }
+ runner.ExitCode = &wr.ExitCode
+ waitDocker = nil
+ }
}
- runner.ExitCode = &wr.ExitCode
// wait for stdout/stderr to complete
<-runner.loggingDone
diff --git a/services/crunch-run/crunchrun_test.go b/services/crunch-run/crunchrun_test.go
index d1d2cee..eaf62d1 100644
--- a/services/crunch-run/crunchrun_test.go
+++ b/services/crunch-run/crunchrun_test.go
@@ -6,12 +6,6 @@ import (
"encoding/json"
"errors"
"fmt"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/sdk/go/keepclient"
- "git.curoverse.com/arvados.git/sdk/go/manifest"
- "github.com/curoverse/dockerclient"
- . "gopkg.in/check.v1"
"io"
"io/ioutil"
"os"
@@ -23,6 +17,13 @@ import (
"syscall"
"testing"
"time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "git.curoverse.com/arvados.git/sdk/go/manifest"
+ "github.com/curoverse/dockerclient"
+ . "gopkg.in/check.v1"
)
// Gocheck boilerplate
@@ -717,7 +718,28 @@ func (s *TestSuite) TestFullRunSetCwd(c *C) {
c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "/bin\n"), Equals, true)
}
-func (s *TestSuite) TestCancel(c *C) {
+func (s *TestSuite) TestStopOnSignal(c *C) {
+ s.testStopContainer(c, func(cr *ContainerRunner) {
+ go func() {
+ for cr.ContainerID == "" {
+ time.Sleep(time.Millisecond)
+ }
+ cr.SigChan <- syscall.SIGINT
+ }()
+ })
+}
+
+func (s *TestSuite) TestStopOnArvMountDeath(c *C) {
+ s.testStopContainer(c, func(cr *ContainerRunner) {
+ cr.ArvMountExit = make(chan error)
+ go func() {
+ cr.ArvMountExit <- exec.Command("true").Run()
+ close(cr.ArvMountExit)
+ }()
+ })
+}
+
+func (s *TestSuite) testStopContainer(c *C, setup func(cr *ContainerRunner)) {
record := `{
"command": ["/bin/sh", "-c", "echo foo && sleep 30 && echo bar"],
"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
@@ -744,30 +766,27 @@ func (s *TestSuite) TestCancel(c *C) {
api := &ArvTestClient{Container: rec}
cr := NewContainerRunner(api, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
- am := &ArvMountCmdLine{}
- cr.RunArvMount = am.ArvMountTest
+ cr.RunArvMount = func([]string, string) (*exec.Cmd, error) { return nil, nil }
+ setup(cr)
+ done := make(chan error)
go func() {
- for cr.ContainerID == "" {
- time.Sleep(time.Millisecond)
- }
- cr.SigChan <- syscall.SIGINT
+ done <- cr.Run()
}()
-
- err = cr.Run()
-
- c.Check(err, IsNil)
- if err != nil {
- for k, v := range api.Logs {
- c.Log(k)
- c.Log(v.String())
- }
+ select {
+ case <-time.After(20 * time.Second):
+ c.Fatal("timed out")
+ case err = <-done:
+ c.Check(err, IsNil)
+ }
+ for k, v := range api.Logs {
+ c.Log(k)
+ c.Log(v.String())
}
c.Check(api.CalledWith("container.log", nil), NotNil)
c.Check(api.CalledWith("container.state", "Cancelled"), NotNil)
c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "foo\n"), Equals, true)
-
}
func (s *TestSuite) TestFullRunSetEnv(c *C) {
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list