[ARVADOS] updated: c106221bf8c90fd80ca2c19a578fd7faec985229

Git user git at public.curoverse.com
Sat Feb 25 02:03:53 EST 2017


Summary of changes:
 services/crunch-run/crunchrun.go      | 10 +++---
 services/crunch-run/crunchrun_test.go | 65 ++++++++++++++++++++++-------------
 services/crunch-run/logging.go        | 38 +++++++++++---------
 3 files changed, 69 insertions(+), 44 deletions(-)

  discards  1042f200521bed5c9bb658d24afc442d552a76fa (commit)
       via  c106221bf8c90fd80ca2c19a578fd7faec985229 (commit)
       via  2ac112e2364b009166e0f67a3455fffb3cccc16c (commit)

This update added new revisions after undoing existing revisions.  That is
to say, the old revision is not a strict subset of the new revision.  This
situation occurs when you --force push a change and generate a repository
containing something like this:

 * -- * -- B -- O -- O -- O (1042f200521bed5c9bb658d24afc442d552a76fa)
            \
             N -- N -- N (c106221bf8c90fd80ca2c19a578fd7faec985229)

When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit c106221bf8c90fd80ca2c19a578fd7faec985229
Author: Tom Clegg <tom at curoverse.com>
Date:   Sat Feb 25 01:56:53 2017 -0500

    10777: Close and flush logs right away instead of waiting for next tick.

diff --git a/services/crunch-run/logging.go b/services/crunch-run/logging.go
index 4f8f95c..5254ff6 100644
--- a/services/crunch-run/logging.go
+++ b/services/crunch-run/logging.go
@@ -4,11 +4,12 @@ import (
 	"bufio"
 	"bytes"
 	"fmt"
-	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
 	"io"
 	"log"
 	"sync"
 	"time"
+
+	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
 )
 
 // Timestamper is the signature for a function that takes a timestamp and
@@ -31,9 +32,9 @@ type ThrottledLogger struct {
 	*log.Logger
 	buf *bytes.Buffer
 	sync.Mutex
-	writer      io.WriteCloser
-	stop        bool
-	flusherDone chan bool
+	writer   io.WriteCloser
+	stopping chan struct{}
+	stopped  chan struct{}
 	Timestamper
 	Immediate *log.Logger
 }
@@ -80,10 +81,13 @@ func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
 func (tl *ThrottledLogger) flusher() {
 	ticker := time.NewTicker(time.Second)
 	defer ticker.Stop()
-	for range ticker.C {
-		// We use a separate "stopping" var here to ensure we flush
-		// tl.buf after tl.stop becomes true.
-		stopping := tl.stop
+	for stopping := false; !stopping; {
+		select {
+		case <-tl.stopping:
+			// flush tl.buf, then exit the loop
+			stopping = true
+		case <-ticker.C:
+		}
 
 		var ready *bytes.Buffer
 
@@ -94,19 +98,20 @@ func (tl *ThrottledLogger) flusher() {
 		if ready != nil && ready.Len() > 0 {
 			tl.writer.Write(ready.Bytes())
 		}
-
-		if stopping {
-			break
-		}
 	}
-	close(tl.flusherDone)
+	close(tl.stopped)
 }
 
 // Close the flusher goroutine and wait for it to complete, then close the
 // underlying Writer.
 func (tl *ThrottledLogger) Close() error {
-	tl.stop = true
-	<-tl.flusherDone
+	select {
+	case <-tl.stopping:
+		// already stopped
+	default:
+		close(tl.stopping)
+	}
+	<-tl.stopped
 	return tl.writer.Close()
 }
 
@@ -154,7 +159,8 @@ func ReadWriteLines(in io.Reader, writer io.Writer, done chan<- bool) {
 // per second.
 func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
 	tl := &ThrottledLogger{}
-	tl.flusherDone = make(chan bool)
+	tl.stopping = make(chan struct{})
+	tl.stopped = make(chan struct{})
 	tl.writer = writer
 	tl.Logger = log.New(tl, "", 0)
 	tl.Timestamper = RFC3339Timestamp

commit 2ac112e2364b009166e0f67a3455fffb3cccc16c
Author: Tom Clegg <tom at curoverse.com>
Date:   Sat Feb 25 01:51:20 2017 -0500

    10777: Stop container if arv-mount dies before container exits.

diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index 0b59f7d..4709967 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -5,12 +5,6 @@ import (
 	"errors"
 	"flag"
 	"fmt"
-	"git.curoverse.com/arvados.git/lib/crunchstat"
-	"git.curoverse.com/arvados.git/sdk/go/arvados"
-	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-	"git.curoverse.com/arvados.git/sdk/go/keepclient"
-	"git.curoverse.com/arvados.git/sdk/go/manifest"
-	"github.com/curoverse/dockerclient"
 	"io"
 	"io/ioutil"
 	"log"
@@ -24,6 +18,13 @@ import (
 	"sync"
 	"syscall"
 	"time"
+
+	"git.curoverse.com/arvados.git/lib/crunchstat"
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+	"git.curoverse.com/arvados.git/sdk/go/keepclient"
+	"git.curoverse.com/arvados.git/sdk/go/manifest"
+	"github.com/curoverse/dockerclient"
 )
 
 // IArvadosClient is the minimal Arvados API methods used by crunch-run.
@@ -123,20 +124,29 @@ func (runner *ContainerRunner) SetupSignals() {
 	signal.Notify(runner.SigChan, syscall.SIGINT)
 	signal.Notify(runner.SigChan, syscall.SIGQUIT)
 
-	go func(sig <-chan os.Signal) {
-		for range sig {
-			if !runner.Cancelled {
-				runner.CancelLock.Lock()
-				runner.Cancelled = true
-				if runner.ContainerID != "" {
-					runner.Docker.StopContainer(runner.ContainerID, 10)
-				}
-				runner.CancelLock.Unlock()
-			}
-		}
+	go func(sig chan os.Signal) {
+		<-sig
+		runner.stop()
+		signal.Stop(sig)
 	}(runner.SigChan)
 }
 
+// stop the underlying Docker container.
+func (runner *ContainerRunner) stop() {
+	runner.CancelLock.Lock()
+	defer runner.CancelLock.Unlock()
+	if runner.Cancelled {
+		return
+	}
+	runner.Cancelled = true
+	if runner.ContainerID != "" {
+		err := runner.Docker.StopContainer(runner.ContainerID, 10)
+		if err != nil {
+			log.Printf("StopContainer failed: %s", err)
+		}
+	}
+}
+
 // LoadImage determines the docker image id from the container record and
 // checks if it is available in the local Docker image store.  If not, it loads
 // the image from Keep.
@@ -600,12 +610,22 @@ func (runner *ContainerRunner) StartContainer() error {
 func (runner *ContainerRunner) WaitFinish() error {
 	runner.CrunchLog.Print("Waiting for container to finish")
 
-	result := runner.Docker.Wait(runner.ContainerID)
-	wr := <-result
-	if wr.Error != nil {
-		return fmt.Errorf("While waiting for container to finish: %v", wr.Error)
+	waitDocker := runner.Docker.Wait(runner.ContainerID)
+	waitMount := runner.ArvMountExit
+	for waitDocker != nil {
+		select {
+		case err := <-waitMount:
+			runner.CrunchLog.Printf("arv-mount exited before container finished: %v", err)
+			waitMount = nil
+			runner.stop()
+		case wr := <-waitDocker:
+			if wr.Error != nil {
+				return fmt.Errorf("While waiting for container to finish: %v", wr.Error)
+			}
+			runner.ExitCode = &wr.ExitCode
+			waitDocker = nil
+		}
 	}
-	runner.ExitCode = &wr.ExitCode
 
 	// wait for stdout/stderr to complete
 	<-runner.loggingDone
diff --git a/services/crunch-run/crunchrun_test.go b/services/crunch-run/crunchrun_test.go
index d1d2cee..eaf62d1 100644
--- a/services/crunch-run/crunchrun_test.go
+++ b/services/crunch-run/crunchrun_test.go
@@ -6,12 +6,6 @@ import (
 	"encoding/json"
 	"errors"
 	"fmt"
-	"git.curoverse.com/arvados.git/sdk/go/arvados"
-	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-	"git.curoverse.com/arvados.git/sdk/go/keepclient"
-	"git.curoverse.com/arvados.git/sdk/go/manifest"
-	"github.com/curoverse/dockerclient"
-	. "gopkg.in/check.v1"
 	"io"
 	"io/ioutil"
 	"os"
@@ -23,6 +17,13 @@ import (
 	"syscall"
 	"testing"
 	"time"
+
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+	"git.curoverse.com/arvados.git/sdk/go/keepclient"
+	"git.curoverse.com/arvados.git/sdk/go/manifest"
+	"github.com/curoverse/dockerclient"
+	. "gopkg.in/check.v1"
 )
 
 // Gocheck boilerplate
@@ -717,7 +718,28 @@ func (s *TestSuite) TestFullRunSetCwd(c *C) {
 	c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "/bin\n"), Equals, true)
 }
 
-func (s *TestSuite) TestCancel(c *C) {
+func (s *TestSuite) TestStopOnSignal(c *C) {
+	s.testStopContainer(c, func(cr *ContainerRunner) {
+		go func() {
+			for cr.ContainerID == "" {
+				time.Sleep(time.Millisecond)
+			}
+			cr.SigChan <- syscall.SIGINT
+		}()
+	})
+}
+
+func (s *TestSuite) TestStopOnArvMountDeath(c *C) {
+	s.testStopContainer(c, func(cr *ContainerRunner) {
+		cr.ArvMountExit = make(chan error)
+		go func() {
+			cr.ArvMountExit <- exec.Command("true").Run()
+			close(cr.ArvMountExit)
+		}()
+	})
+}
+
+func (s *TestSuite) testStopContainer(c *C, setup func(cr *ContainerRunner)) {
 	record := `{
     "command": ["/bin/sh", "-c", "echo foo && sleep 30 && echo bar"],
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
@@ -744,30 +766,27 @@ func (s *TestSuite) TestCancel(c *C) {
 
 	api := &ArvTestClient{Container: rec}
 	cr := NewContainerRunner(api, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
-	am := &ArvMountCmdLine{}
-	cr.RunArvMount = am.ArvMountTest
+	cr.RunArvMount = func([]string, string) (*exec.Cmd, error) { return nil, nil }
+	setup(cr)
 
+	done := make(chan error)
 	go func() {
-		for cr.ContainerID == "" {
-			time.Sleep(time.Millisecond)
-		}
-		cr.SigChan <- syscall.SIGINT
+		done <- cr.Run()
 	}()
-
-	err = cr.Run()
-
-	c.Check(err, IsNil)
-	if err != nil {
-		for k, v := range api.Logs {
-			c.Log(k)
-			c.Log(v.String())
-		}
+	select {
+	case <-time.After(20 * time.Second):
+		c.Fatal("timed out")
+	case err = <-done:
+		c.Check(err, IsNil)
+	}
+	for k, v := range api.Logs {
+		c.Log(k)
+		c.Log(v.String())
 	}
 
 	c.Check(api.CalledWith("container.log", nil), NotNil)
 	c.Check(api.CalledWith("container.state", "Cancelled"), NotNil)
 	c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "foo\n"), Equals, true)
-
 }
 
 func (s *TestSuite) TestFullRunSetEnv(c *C) {

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list