[ARVADOS] created: 1.1.0-122-g4750862
Git user
git at public.curoverse.com
Wed Nov 8 11:00:34 EST 2017
at 47508624a359de86a402030e67737e5e81e78947 (commit)
commit 47508624a359de86a402030e67737e5e81e78947
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Wed Nov 8 10:36:37 2017 -0500
12538: Put cleanup back into separate defer
* Always report unmount command
* Send kill signal to arv-mount on timeout (but don't wait for it)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index ead9184..27a548a 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -1255,26 +1255,41 @@ func (runner *ContainerRunner) CleanupDirs() {
if runner.ArvMount != nil {
var delay int64 = 8
umount := exec.Command("arv-mount", fmt.Sprintf("--unmount-timeout=%d", delay), "--unmount", runner.ArvMountPoint)
+ umount.Stdout = runner.CrunchLog
+ umount.Stderr = runner.CrunchLog
+ runner.CrunchLog.Printf("Running %v", umount.Args)
umnterr := umount.Start()
+
if umnterr != nil {
- runner.CrunchLog.Printf("Error running %v: %v", umount.Args, umnterr)
+ runner.CrunchLog.Printf("Error unmounting: %v", umnterr)
} else {
// If arv-mount --unmount gets stuck for any reason, we
// don't want to wait for it forever. Do Wait() in a goroutine
// so it doesn't block crunch-run.
+ umountExit := make(chan error)
go func() {
mnterr := umount.Wait()
if mnterr != nil {
- runner.CrunchLog.Printf("Error running %v: %v", umount.Args, mnterr)
+ runner.CrunchLog.Printf("Error unmounting: %v", mnterr)
}
+ umountExit <- mnterr
}()
- select {
- case <-runner.ArvMountExit:
- break
- case <-time.After(time.Duration((delay + 1) * int64(time.Second))):
- runner.CrunchLog.Printf("Timed out waiting for %v", umount.Args)
- umount.Process.Kill()
+ for again := true; again; {
+ again = false
+ select {
+ case <-umountExit:
+ umount = nil
+ again = true
+ case <-runner.ArvMountExit:
+ break
+ case <-time.After(time.Duration((delay + 1) * int64(time.Second))):
+ runner.CrunchLog.Printf("Timed out waiting for unmount")
+ if umount != nil {
+ umount.Process.Kill()
+ }
+ runner.ArvMount.Process.Kill()
+ }
}
}
}
@@ -1415,6 +1430,14 @@ func (runner *ContainerRunner) Run() (err error) {
runner.finalState = "Queued"
defer func() {
+ runner.stopSignals()
+ runner.CleanupDirs()
+
+ runner.CrunchLog.Printf("crunch-run finished")
+ runner.CrunchLog.Close()
+ }()
+
+ defer func() {
// checkErr prints e (unless it's nil) and sets err to
// e (unless err is already non-nil). Thus, if err
// hasn't already been assigned when Run() returns,
@@ -1437,21 +1460,20 @@ func (runner *ContainerRunner) Run() (err error) {
// Log the error encountered in Run(), if any
checkErr(err)
- if runner.finalState != "Queued" {
- if runner.IsCancelled() {
- runner.finalState = "Cancelled"
- }
- checkErr(runner.CaptureOutput())
+ if runner.finalState == "Queued" {
+ runner.UpdateContainerFinal()
+ return
}
+ if runner.IsCancelled() {
+ runner.finalState = "Cancelled"
+ // but don't return yet -- we still want to
+ // capture partial output and write logs
+ }
+
+ checkErr(runner.CaptureOutput())
checkErr(runner.CommitLogs())
checkErr(runner.UpdateContainerFinal())
-
- runner.stopSignals()
- runner.CleanupDirs()
-
- runner.CrunchLog.Printf("crunch-run finished")
- runner.CrunchLog.Close()
}()
err = runner.fetchContainerRecord()
commit e2bf56f0a0fa1f6b4fb7b4efc4db5178b074b8ce
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Tue Nov 7 10:14:40 2017 -0500
12538: Refactor crunch-run shutdown
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index 557f30d..ead9184 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -221,7 +221,7 @@ func (runner *ContainerRunner) stop() {
}
}
-func (runner *ContainerRunner) teardown() {
+func (runner *ContainerRunner) stopSignals() {
if runner.SigChan != nil {
signal.Stop(runner.SigChan)
close(runner.SigChan)
@@ -358,8 +358,6 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
return fmt.Errorf("While creating keep mount temp dir: %v", err)
}
- runner.CleanupTempDir = append(runner.CleanupTempDir, runner.ArvMountPoint)
-
pdhOnly := true
tmpcount := 0
arvMountCmd := []string{
@@ -1255,15 +1253,15 @@ func (runner *ContainerRunner) getCollectionManifestForPath(mnt arvados.Mount, b
func (runner *ContainerRunner) CleanupDirs() {
if runner.ArvMount != nil {
- delay := 8
+ var delay int64 = 8
umount := exec.Command("arv-mount", fmt.Sprintf("--unmount-timeout=%d", delay), "--unmount", runner.ArvMountPoint)
umnterr := umount.Start()
if umnterr != nil {
runner.CrunchLog.Printf("Error running %v: %v", umount.Args, umnterr)
} else {
// If arv-mount --unmount gets stuck for any reason, we
- // don't want to wait for it. Spin off the wait for
- // child process so it doesn't block crunch-run.
+ // don't want to wait for it forever. Do Wait() in a goroutine
+ // so it doesn't block crunch-run.
go func() {
mnterr := umount.Wait()
if mnterr != nil {
@@ -1271,23 +1269,23 @@ func (runner *ContainerRunner) CleanupDirs() {
}
}()
- timeout := time.NewTimer((delay + 1) * time.Second)
select {
case <-runner.ArvMountExit:
break
- case <-timeout.C:
+ case <-time.After(time.Duration((delay + 1) * int64(time.Second))):
runner.CrunchLog.Printf("Timed out waiting for %v", umount.Args)
+ umount.Process.Kill()
}
}
+ }
+
+ if runner.ArvMountPoint != "" {
if rmerr := os.Remove(runner.ArvMountPoint); rmerr != nil {
runner.CrunchLog.Printf("While cleaning up arv-mount directory %s: %v", runner.ArvMountPoint, rmerr)
}
}
for _, tmpdir := range runner.CleanupTempDir {
- if tmpdir == runner.ArvMountPoint {
- continue
- }
if rmerr := os.RemoveAll(tmpdir); rmerr != nil {
runner.CrunchLog.Printf("While cleaning up temporary directory %s: %v", tmpdir, rmerr)
}
@@ -1414,13 +1412,6 @@ func (runner *ContainerRunner) Run() (err error) {
runner.CrunchLog.Printf("Executing on host '%s'", hostname)
}
- // Clean up temporary directories _after_ finalizing
- // everything (if we've made any by then)
- defer func() {
- runner.CrunchLog.Printf("crunch-run finished")
- }()
- defer runner.CleanupDirs()
-
runner.finalState = "Queued"
defer func() {
@@ -1446,28 +1437,21 @@ func (runner *ContainerRunner) Run() (err error) {
// Log the error encountered in Run(), if any
checkErr(err)
- if runner.finalState == "Queued" {
- runner.CrunchLog.Close()
- runner.UpdateContainerFinal()
- return
- }
-
- if runner.IsCancelled() {
- runner.finalState = "Cancelled"
- // but don't return yet -- we still want to
- // capture partial output and write logs
+ if runner.finalState != "Queued" {
+ if runner.IsCancelled() {
+ runner.finalState = "Cancelled"
+ }
+ checkErr(runner.CaptureOutput())
}
- checkErr(runner.CaptureOutput())
checkErr(runner.CommitLogs())
checkErr(runner.UpdateContainerFinal())
- // The real log is already closed, but then we opened
- // a new one in case we needed to log anything while
- // finalizing.
- runner.CrunchLog.Close()
+ runner.stopSignals()
+ runner.CleanupDirs()
- runner.teardown()
+ runner.CrunchLog.Printf("crunch-run finished")
+ runner.CrunchLog.Close()
}()
err = runner.fetchContainerRecord()
diff --git a/services/crunch-run/crunchrun_test.go b/services/crunch-run/crunchrun_test.go
index 5992be0..bc0b312 100644
--- a/services/crunch-run/crunchrun_test.go
+++ b/services/crunch-run/crunchrun_test.go
@@ -670,7 +670,7 @@ func FullRunHelper(c *C, record string, extraMounts []string, exitCode int, fn f
}
c.Check(api.WasSetRunning, Equals, true)
- c.Check(api.Content[api.Calls-1]["container"].(arvadosclient.Dict)["log"], NotNil)
+ c.Check(api.Content[api.Calls-2]["container"].(arvadosclient.Dict)["log"], NotNil)
if err != nil {
for k, v := range api.Logs {
@@ -1010,6 +1010,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
"--read-write", "--crunchstat-interval=5",
"--mount-by-pdh", "by_id", realTemp + "/keep1"})
c.Check(cr.Binds, DeepEquals, []string{realTemp + "/2:/tmp"})
+ os.RemoveAll(cr.ArvMountPoint)
cr.CleanupDirs()
checkEmpty()
}
@@ -1028,6 +1029,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
"--read-write", "--crunchstat-interval=5",
"--mount-by-pdh", "by_id", realTemp + "/keep1"})
c.Check(cr.Binds, DeepEquals, []string{realTemp + "/2:/out", realTemp + "/3:/tmp"})
+ os.RemoveAll(cr.ArvMountPoint)
cr.CleanupDirs()
checkEmpty()
}
@@ -1048,6 +1050,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
"--read-write", "--crunchstat-interval=5",
"--mount-by-pdh", "by_id", realTemp + "/keep1"})
c.Check(cr.Binds, DeepEquals, []string{realTemp + "/2:/tmp", stubCertPath + ":/etc/arvados/ca-certificates.crt:ro"})
+ os.RemoveAll(cr.ArvMountPoint)
cr.CleanupDirs()
checkEmpty()
@@ -1070,6 +1073,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
"--read-write", "--crunchstat-interval=5",
"--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", realTemp + "/keep1"})
c.Check(cr.Binds, DeepEquals, []string{realTemp + "/keep1/tmp0:/keeptmp"})
+ os.RemoveAll(cr.ArvMountPoint)
cr.CleanupDirs()
checkEmpty()
}
@@ -1094,6 +1098,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
sort.StringSlice(cr.Binds).Sort()
c.Check(cr.Binds, DeepEquals, []string{realTemp + "/keep1/by_id/59389a8f9ee9d399be35462a0f92541c+53:/keepinp:ro",
realTemp + "/keep1/tmp0:/keepout"})
+ os.RemoveAll(cr.ArvMountPoint)
cr.CleanupDirs()
checkEmpty()
}
@@ -1119,6 +1124,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
sort.StringSlice(cr.Binds).Sort()
c.Check(cr.Binds, DeepEquals, []string{realTemp + "/keep1/by_id/59389a8f9ee9d399be35462a0f92541c+53:/keepinp:ro",
realTemp + "/keep1/tmp0:/keepout"})
+ os.RemoveAll(cr.ArvMountPoint)
cr.CleanupDirs()
checkEmpty()
}
@@ -1143,6 +1149,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
content, err := ioutil.ReadFile(realTemp + "/2/mountdata.json")
c.Check(err, IsNil)
c.Check(content, DeepEquals, []byte(test.out))
+ os.RemoveAll(cr.ArvMountPoint)
cr.CleanupDirs()
checkEmpty()
}
@@ -1166,6 +1173,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
"--read-write", "--crunchstat-interval=5",
"--file-cache", "512", "--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", realTemp + "/keep1"})
c.Check(cr.Binds, DeepEquals, []string{realTemp + "/2:/tmp", realTemp + "/keep1/tmp0:/tmp/foo:ro"})
+ os.RemoveAll(cr.ArvMountPoint)
cr.CleanupDirs()
checkEmpty()
}
@@ -1184,6 +1192,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
err := cr.SetupMounts()
c.Check(err, NotNil)
c.Check(err, ErrorMatches, `Writable mount points are not permitted underneath the output_path.*`)
+ os.RemoveAll(cr.ArvMountPoint)
cr.CleanupDirs()
checkEmpty()
}
@@ -1202,6 +1211,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
err := cr.SetupMounts()
c.Check(err, NotNil)
c.Check(err, ErrorMatches, `Only mount points of kind 'collection' are supported underneath the output_path.*`)
+ os.RemoveAll(cr.ArvMountPoint)
cr.CleanupDirs()
checkEmpty()
}
@@ -1218,6 +1228,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
err := cr.SetupMounts()
c.Check(err, NotNil)
c.Check(err, ErrorMatches, `Unsupported mount kind 'tmp' for stdin.*`)
+ os.RemoveAll(cr.ArvMountPoint)
cr.CleanupDirs()
checkEmpty()
}
commit 3b278b8959a80103506470457485f523dcbfba50
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Mon Nov 6 16:41:05 2017 -0500
12538: Refactor and assume arv-mount --unmount does all the hard work.
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index 7080ca2..557f30d 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -1253,34 +1253,30 @@ func (runner *ContainerRunner) getCollectionManifestForPath(mnt arvados.Mount, b
return extracted.Text, nil
}
-func (runner *ContainerRunner) tryUnmount(umount *exec.Cmd) error {
- umnterr := umount.Start()
- if umnterr != nil {
- runner.CrunchLog.Printf("Error: %v", umnterr)
- }
- go func() {
- mnterr := umount.Wait()
- if mnterr != nil {
- runner.CrunchLog.Printf("Error running %v: %v", umount.Args, mnterr)
- }
- }()
-
- timeout := time.NewTimer(9 * time.Second)
- select {
- case <-runner.ArvMountExit:
- return nil
- case <-timeout.C:
- return fmt.Errorf("Timed out")
- }
-}
-
func (runner *ContainerRunner) CleanupDirs() {
if runner.ArvMount != nil {
- if err := runner.tryUnmount(exec.Command("fusermount", "-u", runner.ArvMountPoint)); err != nil {
- runner.CrunchLog.Printf("arv-mount not ended, will try force unmount: %v", err)
- err = runner.tryUnmount(exec.Command("arv-mount", "--unmount-timeout=8", "--unmount", runner.ArvMountPoint))
- if err != nil {
- runner.CrunchLog.Printf("Error running arv-mount --unmount: %v", err)
+ delay := 8
+ umount := exec.Command("arv-mount", fmt.Sprintf("--unmount-timeout=%d", delay), "--unmount", runner.ArvMountPoint)
+ umnterr := umount.Start()
+ if umnterr != nil {
+ runner.CrunchLog.Printf("Error running %v: %v", umount.Args, umnterr)
+ } else {
+ // If arv-mount --unmount gets stuck for any reason, we
+ // don't want to wait for it. Spin off the wait for
+ // child process so it doesn't block crunch-run.
+ go func() {
+ mnterr := umount.Wait()
+ if mnterr != nil {
+ runner.CrunchLog.Printf("Error running %v: %v", umount.Args, mnterr)
+ }
+ }()
+
+ timeout := time.NewTimer((delay + 1) * time.Second)
+ select {
+ case <-runner.ArvMountExit:
+ break
+ case <-timeout.C:
+ runner.CrunchLog.Printf("Timed out waiting for %v", umount.Args)
}
}
if rmerr := os.Remove(runner.ArvMountPoint); rmerr != nil {
commit ec11b576da48f0272121f77268cef39a54c9bb7b
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Fri Nov 3 16:14:42 2017 -0400
12538: Refactor force-unmount behavior.
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index 9e05489..7080ca2 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -183,7 +183,6 @@ type ContainerRunner struct {
enableNetwork string // one of "default" or "always"
networkMode string // passed through to HostConfig.NetworkMode
arvMountLog *ThrottledLogger
- arvMountKill func()
}
// setupSignals sets up signal handling to gracefully terminate the underlying
@@ -309,10 +308,6 @@ func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (
return nil, err
}
- runner.arvMountKill = func() {
- c.Process.Kill()
- }
-
statReadme := make(chan bool)
runner.ArvMountExit = make(chan error)
@@ -1258,38 +1253,46 @@ func (runner *ContainerRunner) getCollectionManifestForPath(mnt arvados.Mount, b
return extracted.Text, nil
}
+func (runner *ContainerRunner) tryUnmount(umount *exec.Cmd) error {
+ umnterr := umount.Start()
+ if umnterr != nil {
+ runner.CrunchLog.Printf("Error: %v", umnterr)
+ }
+ go func() {
+ mnterr := umount.Wait()
+ if mnterr != nil {
+ runner.CrunchLog.Printf("Error running %v: %v", umount.Args, mnterr)
+ }
+ }()
+
+ timeout := time.NewTimer(9 * time.Second)
+ select {
+ case <-runner.ArvMountExit:
+ return nil
+ case <-timeout.C:
+ return fmt.Errorf("Timed out")
+ }
+}
+
func (runner *ContainerRunner) CleanupDirs() {
if runner.ArvMount != nil {
- var umount *exec.Cmd
- umount = exec.Command("fusermount", "-u", "-z", runner.ArvMountPoint)
- done := false
- try := 1
- for !done {
- umnterr := umount.Run()
- if umnterr != nil {
- runner.CrunchLog.Printf("Error: %v", umnterr)
- }
- timeout := time.NewTimer(10 * time.Second)
- select {
- case <-runner.ArvMountExit:
- done = true
- case <-timeout.C:
- if try == 1 {
- runner.CrunchLog.Printf("Timeout waiting for arv-mount to end. Will force unmount.")
- umount = exec.Command("arv-mount", "--unmount-timeout=10", "--unmount", runner.ArvMountPoint)
- try = 2
- } else {
- runner.CrunchLog.Printf("Killing arv-mount")
- runner.arvMountKill()
- umount = exec.Command("fusermount", "-u", "-z", runner.ArvMountPoint)
- }
+ if err := runner.tryUnmount(exec.Command("fusermount", "-u", runner.ArvMountPoint)); err != nil {
+ runner.CrunchLog.Printf("arv-mount not ended, will try force unmount: %v", err)
+ err = runner.tryUnmount(exec.Command("arv-mount", "--unmount-timeout=8", "--unmount", runner.ArvMountPoint))
+ if err != nil {
+ runner.CrunchLog.Printf("Error running arv-mount --unmount: %v", err)
}
}
+ if rmerr := os.Remove(runner.ArvMountPoint); rmerr != nil {
+ runner.CrunchLog.Printf("While cleaning up arv-mount directory %s: %v", runner.ArvMountPoint, rmerr)
+ }
}
for _, tmpdir := range runner.CleanupTempDir {
- rmerr := os.RemoveAll(tmpdir)
- if rmerr != nil {
+ if tmpdir == runner.ArvMountPoint {
+ continue
+ }
+ if rmerr := os.RemoveAll(tmpdir); rmerr != nil {
runner.CrunchLog.Printf("While cleaning up temporary directory %s: %v", tmpdir, rmerr)
}
}
commit fff3b19b8a7b8eca06065bdf60b0541c26e27935
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Fri Nov 3 15:04:00 2017 -0400
12538: Fix tests.
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index 55edb99..9e05489 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -1299,7 +1299,9 @@ func (runner *ContainerRunner) CleanupDirs() {
func (runner *ContainerRunner) CommitLogs() error {
runner.CrunchLog.Print(runner.finalState)
- runner.arvMountLog.Close()
+ if runner.arvMountLog != nil {
+ runner.arvMountLog.Close()
+ }
runner.CrunchLog.Close()
// Closing CrunchLog above allows them to be committed to Keep at this
diff --git a/services/crunch-run/crunchrun_test.go b/services/crunch-run/crunchrun_test.go
index d3c9990..5992be0 100644
--- a/services/crunch-run/crunchrun_test.go
+++ b/services/crunch-run/crunchrun_test.go
@@ -131,7 +131,7 @@ func (t *TestDockerClient) ContainerCreate(ctx context.Context, config *dockerco
func (t *TestDockerClient) ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error {
if container == "abcde" {
- go t.fn(t)
+ // t.fn gets executed in ContainerWait
return nil
} else {
return errors.New("Invalid container id")
@@ -147,6 +147,7 @@ func (t *TestDockerClient) ContainerWait(ctx context.Context, container string,
body := make(chan dockercontainer.ContainerWaitOKBody)
err := make(chan error)
go func() {
+ t.fn(t)
body <- dockercontainer.ContainerWaitOKBody{StatusCode: int64(t.finish)}
close(body)
close(err)
@@ -1002,10 +1003,12 @@ func (s *TestSuite) TestSetupMounts(c *C) {
cr.Container.Mounts = make(map[string]arvados.Mount)
cr.Container.Mounts["/tmp"] = arvados.Mount{Kind: "tmp"}
cr.OutputPath = "/tmp"
-
+ cr.statInterval = 5 * time.Second
err := cr.SetupMounts()
c.Check(err, IsNil)
- c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other", "--read-write", "--mount-by-pdh", "by_id", realTemp + "/keep1"})
+ c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other",
+ "--read-write", "--crunchstat-interval=5",
+ "--mount-by-pdh", "by_id", realTemp + "/keep1"})
c.Check(cr.Binds, DeepEquals, []string{realTemp + "/2:/tmp"})
cr.CleanupDirs()
checkEmpty()
@@ -1021,7 +1024,9 @@ func (s *TestSuite) TestSetupMounts(c *C) {
err := cr.SetupMounts()
c.Check(err, IsNil)
- c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other", "--read-write", "--mount-by-pdh", "by_id", realTemp + "/keep1"})
+ c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other",
+ "--read-write", "--crunchstat-interval=5",
+ "--mount-by-pdh", "by_id", realTemp + "/keep1"})
c.Check(cr.Binds, DeepEquals, []string{realTemp + "/2:/out", realTemp + "/3:/tmp"})
cr.CleanupDirs()
checkEmpty()
@@ -1039,7 +1044,9 @@ func (s *TestSuite) TestSetupMounts(c *C) {
err := cr.SetupMounts()
c.Check(err, IsNil)
- c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other", "--read-write", "--mount-by-pdh", "by_id", realTemp + "/keep1"})
+ c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other",
+ "--read-write", "--crunchstat-interval=5",
+ "--mount-by-pdh", "by_id", realTemp + "/keep1"})
c.Check(cr.Binds, DeepEquals, []string{realTemp + "/2:/tmp", stubCertPath + ":/etc/arvados/ca-certificates.crt:ro"})
cr.CleanupDirs()
checkEmpty()
@@ -1059,7 +1066,9 @@ func (s *TestSuite) TestSetupMounts(c *C) {
err := cr.SetupMounts()
c.Check(err, IsNil)
- c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other", "--read-write", "--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", realTemp + "/keep1"})
+ c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other",
+ "--read-write", "--crunchstat-interval=5",
+ "--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", realTemp + "/keep1"})
c.Check(cr.Binds, DeepEquals, []string{realTemp + "/keep1/tmp0:/keeptmp"})
cr.CleanupDirs()
checkEmpty()
@@ -1079,7 +1088,9 @@ func (s *TestSuite) TestSetupMounts(c *C) {
err := cr.SetupMounts()
c.Check(err, IsNil)
- c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other", "--read-write", "--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", realTemp + "/keep1"})
+ c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other",
+ "--read-write", "--crunchstat-interval=5",
+ "--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", realTemp + "/keep1"})
sort.StringSlice(cr.Binds).Sort()
c.Check(cr.Binds, DeepEquals, []string{realTemp + "/keep1/by_id/59389a8f9ee9d399be35462a0f92541c+53:/keepinp:ro",
realTemp + "/keep1/tmp0:/keepout"})
@@ -1102,7 +1113,9 @@ func (s *TestSuite) TestSetupMounts(c *C) {
err := cr.SetupMounts()
c.Check(err, IsNil)
- c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other", "--read-write", "--file-cache", "512", "--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", realTemp + "/keep1"})
+ c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other",
+ "--read-write", "--crunchstat-interval=5",
+ "--file-cache", "512", "--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", realTemp + "/keep1"})
sort.StringSlice(cr.Binds).Sort()
c.Check(cr.Binds, DeepEquals, []string{realTemp + "/keep1/by_id/59389a8f9ee9d399be35462a0f92541c+53:/keepinp:ro",
realTemp + "/keep1/tmp0:/keepout"})
@@ -1149,7 +1162,9 @@ func (s *TestSuite) TestSetupMounts(c *C) {
err := cr.SetupMounts()
c.Check(err, IsNil)
- c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other", "--read-write", "--file-cache", "512", "--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", realTemp + "/keep1"})
+ c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other",
+ "--read-write", "--crunchstat-interval=5",
+ "--file-cache", "512", "--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", realTemp + "/keep1"})
c.Check(cr.Binds, DeepEquals, []string{realTemp + "/2:/tmp", realTemp + "/keep1/tmp0:/tmp/foo:ro"})
cr.CleanupDirs()
checkEmpty()
commit 3c5de241f6a6ac56e8bf986c89ffe153b9d941fe
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Fri Nov 3 11:34:46 2017 -0400
12538: Make handling of arv-mount process more robust
* If initial "fusermount -u -z" of arv-mount dir times out, use "arv-mount --unmount"
* If that doesn't work, send sigkill to arv-mount and then run "fusermount -u
-z" again
* Refactor "stop container if arv-mount terminates" behavior
* Make arv-mount logging channel work as intended and enable crunchstat for arv-mount.
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index a424088..55edb99 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -182,6 +182,7 @@ type ContainerRunner struct {
enableNetwork string // one of "default" or "always"
networkMode string // passed through to HostConfig.NetworkMode
+ arvMountLog *ThrottledLogger
arvMountKill func()
}
@@ -194,7 +195,10 @@ func (runner *ContainerRunner) setupSignals() {
signal.Notify(runner.SigChan, syscall.SIGQUIT)
go func(sig chan os.Signal) {
- <-sig
+ s := <-sig
+ if s != nil {
+ runner.CrunchLog.Printf("Caught signal %v", s)
+ }
runner.stop()
}(runner.SigChan)
}
@@ -211,8 +215,10 @@ func (runner *ContainerRunner) stop() {
timeout := time.Duration(10)
err := runner.Docker.ContainerStop(context.TODO(), runner.ContainerID, &(timeout))
if err != nil {
- log.Printf("StopContainer failed: %s", err)
+ runner.CrunchLog.Printf("StopContainer failed: %s", err)
}
+ // Suppress multiple calls to stop()
+ runner.cStarted = false
}
}
@@ -292,9 +298,11 @@ func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (
}
c.Env = append(c.Env, "ARVADOS_API_TOKEN="+token)
- nt := NewThrottledLogger(runner.NewLogWriter("arv-mount"))
- c.Stdout = nt
- c.Stderr = nt
+ runner.arvMountLog = NewThrottledLogger(runner.NewLogWriter("arv-mount"))
+ c.Stdout = runner.arvMountLog
+ c.Stderr = runner.arvMountLog
+
+ runner.CrunchLog.Printf("Running %v", c.Args)
err = c.Start()
if err != nil {
@@ -322,7 +330,11 @@ func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (
}()
go func() {
- runner.ArvMountExit <- c.Wait()
+ mnterr := c.Wait()
+ if mnterr != nil {
+ runner.CrunchLog.Printf("Arv-mount exit error: %v", mnterr)
+ }
+ runner.ArvMountExit <- mnterr
close(runner.ArvMountExit)
}()
@@ -355,7 +367,11 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
pdhOnly := true
tmpcount := 0
- arvMountCmd := []string{"--foreground", "--allow-other", "--read-write"}
+ arvMountCmd := []string{
+ "--foreground",
+ "--allow-other",
+ "--read-write",
+ fmt.Sprintf("--crunchstat-interval=%v", runner.statInterval.Seconds())}
if runner.Container.RuntimeConstraints.KeepCacheRAM > 0 {
arvMountCmd = append(arvMountCmd, "--file-cache", fmt.Sprintf("%d", runner.Container.RuntimeConstraints.KeepCacheRAM))
@@ -896,12 +912,23 @@ func (runner *ContainerRunner) WaitFinish() (err error) {
waitOk, waitErr := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID, "not-running")
+ go func() {
+ <-runner.ArvMountExit
+ if runner.cStarted {
+ runner.CrunchLog.Printf("arv-mount exited while container is still running. Stopping container.")
+ runner.stop()
+ }
+ }()
+
var waitBody dockercontainer.ContainerWaitOKBody
select {
case waitBody = <-waitOk:
case err = <-waitErr:
}
+ // Container isn't running any more
+ runner.cStarted = false
+
if err != nil {
return fmt.Errorf("container wait: %v", err)
}
@@ -910,15 +937,6 @@ func (runner *ContainerRunner) WaitFinish() (err error) {
code := int(waitBody.StatusCode)
runner.ExitCode = &code
- waitMount := runner.ArvMountExit
- select {
- case err = <-waitMount:
- runner.CrunchLog.Printf("arv-mount exited before container finished: %v", err)
- waitMount = nil
- runner.stop()
- default:
- }
-
// wait for stdout/stderr to complete
<-runner.loggingDone
@@ -1242,25 +1260,29 @@ func (runner *ContainerRunner) getCollectionManifestForPath(mnt arvados.Mount, b
func (runner *ContainerRunner) CleanupDirs() {
if runner.ArvMount != nil {
- //umount := exec.Command("fusermount", "-u", runner.ArvMountPoint)
- umount := exec.Command("sleep", "1")
- umnterr := umount.Run()
- if umnterr != nil {
- log.Printf("While running fusermount: %v", umnterr)
- }
- timeout := time.NewTimer(10 * time.Second)
- select {
- case mnterr := <-runner.ArvMountExit:
- if mnterr != nil {
- log.Printf("Arv-mount exit error: %v", mnterr)
- }
- case <-timeout.C:
- log.Printf("Timeout waiting for arv-mount to end. Killing arv-mount.")
- runner.arvMountKill()
- umount = exec.Command("arv-mount", "--unmount-timeout=10", "--unmount", runner.ArvMountPoint)
- umnterr = umount.Run()
+ var umount *exec.Cmd
+ umount = exec.Command("fusermount", "-u", "-z", runner.ArvMountPoint)
+ done := false
+ try := 1
+ for !done {
+ umnterr := umount.Run()
if umnterr != nil {
- log.Printf("While running arv-mount --unmount: %v", umnterr)
+ runner.CrunchLog.Printf("Error: %v", umnterr)
+ }
+ timeout := time.NewTimer(10 * time.Second)
+ select {
+ case <-runner.ArvMountExit:
+ done = true
+ case <-timeout.C:
+ if try == 1 {
+ runner.CrunchLog.Printf("Timeout waiting for arv-mount to end. Will force unmount.")
+ umount = exec.Command("arv-mount", "--unmount-timeout=10", "--unmount", runner.ArvMountPoint)
+ try = 2
+ } else {
+ runner.CrunchLog.Printf("Killing arv-mount")
+ runner.arvMountKill()
+ umount = exec.Command("fusermount", "-u", "-z", runner.ArvMountPoint)
+ }
}
}
}
@@ -1276,14 +1298,17 @@ func (runner *ContainerRunner) CleanupDirs() {
// CommitLogs posts the collection containing the final container logs.
func (runner *ContainerRunner) CommitLogs() error {
runner.CrunchLog.Print(runner.finalState)
+
+ runner.arvMountLog.Close()
runner.CrunchLog.Close()
- // Closing CrunchLog above allows it to be committed to Keep at this
+ // Closing CrunchLog above allows them to be committed to Keep at this
// point, but re-open crunch log with ArvClient in case there are any
- // other further (such as failing to write the log to Keep!) while
- // shutting down
+ // other further errors (such as failing to write the log to Keep!)
+ // while shutting down
runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{ArvClient: runner.ArvClient,
UUID: runner.Container.UUID, loggingStream: "crunch-run", writeCloser: nil})
+ runner.CrunchLog.Immediate = log.New(os.Stderr, runner.Container.UUID+" ", 0)
if runner.LogsPDH != nil {
// If we have already assigned something to LogsPDH,
@@ -1370,8 +1395,11 @@ func (runner *ContainerRunner) IsCancelled() bool {
// NewArvLogWriter creates an ArvLogWriter
func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
- return &ArvLogWriter{ArvClient: runner.ArvClient, UUID: runner.Container.UUID, loggingStream: name,
- writeCloser: runner.LogCollection.Open(name + ".txt")}
+ return &ArvLogWriter{
+ ArvClient: runner.ArvClient,
+ UUID: runner.Container.UUID,
+ loggingStream: name,
+ writeCloser: runner.LogCollection.Open(name + ".txt")}
}
// Run the full container lifecycle.
@@ -1387,6 +1415,9 @@ func (runner *ContainerRunner) Run() (err error) {
// Clean up temporary directories _after_ finalizing
// everything (if we've made any by then)
+ defer func() {
+ runner.CrunchLog.Printf("crunch-run finished")
+ }()
defer runner.CleanupDirs()
runner.finalState = "Queued"
diff --git a/services/crunch-run/upload.go b/services/crunch-run/upload.go
index 95925e5..b54e336 100644
--- a/services/crunch-run/upload.go
+++ b/services/crunch-run/upload.go
@@ -18,14 +18,15 @@ import (
"crypto/md5"
"errors"
"fmt"
- "git.curoverse.com/arvados.git/sdk/go/keepclient"
- "git.curoverse.com/arvados.git/sdk/go/manifest"
"io"
"log"
"os"
"path/filepath"
"strings"
"sync"
+
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "git.curoverse.com/arvados.git/sdk/go/manifest"
)
// Block is a data block in a manifest stream
commit 899d369bef489b89d9ce1b1cd5e07ce8304a9a85
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Thu Nov 2 17:52:58 2017 -0400
Force kill arv-mount wip
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index 3678bd8..a424088 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -182,6 +182,7 @@ type ContainerRunner struct {
enableNetwork string // one of "default" or "always"
networkMode string // passed through to HostConfig.NetworkMode
+ arvMountKill func()
}
// setupSignals sets up signal handling to gracefully terminate the underlying
@@ -300,6 +301,10 @@ func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (
return nil, err
}
+ runner.arvMountKill = func() {
+ c.Process.Kill()
+ }
+
statReadme := make(chan bool)
runner.ArvMountExit = make(chan error)
@@ -1237,15 +1242,26 @@ func (runner *ContainerRunner) getCollectionManifestForPath(mnt arvados.Mount, b
func (runner *ContainerRunner) CleanupDirs() {
if runner.ArvMount != nil {
- umount := exec.Command("fusermount", "-z", "-u", runner.ArvMountPoint)
+ //umount := exec.Command("fusermount", "-u", runner.ArvMountPoint)
+ umount := exec.Command("sleep", "1")
umnterr := umount.Run()
if umnterr != nil {
- runner.CrunchLog.Printf("While running fusermount: %v", umnterr)
+ log.Printf("While running fusermount: %v", umnterr)
}
-
- mnterr := <-runner.ArvMountExit
- if mnterr != nil {
- runner.CrunchLog.Printf("Arv-mount exit error: %v", mnterr)
+ timeout := time.NewTimer(10 * time.Second)
+ select {
+ case mnterr := <-runner.ArvMountExit:
+ if mnterr != nil {
+ log.Printf("Arv-mount exit error: %v", mnterr)
+ }
+ case <-timeout.C:
+ log.Printf("Timeout waiting for arv-mount to end. Killing arv-mount.")
+ runner.arvMountKill()
+ umount = exec.Command("arv-mount", "--unmount-timeout=10", "--unmount", runner.ArvMountPoint)
+ umnterr = umount.Run()
+ if umnterr != nil {
+ log.Printf("While running arv-mount --unmount: %v", umnterr)
+ }
}
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list