[ARVADOS] created: 1.1.0-122-g4750862

Git user git at public.curoverse.com
Wed Nov 8 11:00:34 EST 2017


        at  47508624a359de86a402030e67737e5e81e78947 (commit)


commit 47508624a359de86a402030e67737e5e81e78947
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date:   Wed Nov 8 10:36:37 2017 -0500

    12538: Put cleanup back into separate defer
    
    * Always report unmount command
    
    * Send kill signal to arv-mount on timeout (but don't wait for it)
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>

diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index ead9184..27a548a 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -1255,26 +1255,41 @@ func (runner *ContainerRunner) CleanupDirs() {
 	if runner.ArvMount != nil {
 		var delay int64 = 8
 		umount := exec.Command("arv-mount", fmt.Sprintf("--unmount-timeout=%d", delay), "--unmount", runner.ArvMountPoint)
+		umount.Stdout = runner.CrunchLog
+		umount.Stderr = runner.CrunchLog
+		runner.CrunchLog.Printf("Running %v", umount.Args)
 		umnterr := umount.Start()
+
 		if umnterr != nil {
-			runner.CrunchLog.Printf("Error running %v: %v", umount.Args, umnterr)
+			runner.CrunchLog.Printf("Error unmounting: %v", umnterr)
 		} else {
 			// If arv-mount --unmount gets stuck for any reason, we
 			// don't want to wait for it forever.  Do Wait() in a goroutine
 			// so it doesn't block crunch-run.
+			umountExit := make(chan error)
 			go func() {
 				mnterr := umount.Wait()
 				if mnterr != nil {
-					runner.CrunchLog.Printf("Error running %v: %v", umount.Args, mnterr)
+					runner.CrunchLog.Printf("Error unmounting: %v", mnterr)
 				}
+				umountExit <- mnterr
 			}()
 
-			select {
-			case <-runner.ArvMountExit:
-				break
-			case <-time.After(time.Duration((delay + 1) * int64(time.Second))):
-				runner.CrunchLog.Printf("Timed out waiting for %v", umount.Args)
-				umount.Process.Kill()
+			for again := true; again; {
+				again = false
+				select {
+				case <-umountExit:
+					umount = nil
+					again = true
+				case <-runner.ArvMountExit:
+					break
+				case <-time.After(time.Duration((delay + 1) * int64(time.Second))):
+					runner.CrunchLog.Printf("Timed out waiting for unmount")
+					if umount != nil {
+						umount.Process.Kill()
+					}
+					runner.ArvMount.Process.Kill()
+				}
 			}
 		}
 	}
@@ -1415,6 +1430,14 @@ func (runner *ContainerRunner) Run() (err error) {
 	runner.finalState = "Queued"
 
 	defer func() {
+		runner.stopSignals()
+		runner.CleanupDirs()
+
+		runner.CrunchLog.Printf("crunch-run finished")
+		runner.CrunchLog.Close()
+	}()
+
+	defer func() {
 		// checkErr prints e (unless it's nil) and sets err to
 		// e (unless err is already non-nil). Thus, if err
 		// hasn't already been assigned when Run() returns,
@@ -1437,21 +1460,20 @@ func (runner *ContainerRunner) Run() (err error) {
 		// Log the error encountered in Run(), if any
 		checkErr(err)
 
-		if runner.finalState != "Queued" {
-			if runner.IsCancelled() {
-				runner.finalState = "Cancelled"
-			}
-			checkErr(runner.CaptureOutput())
+		if runner.finalState == "Queued" {
+			runner.UpdateContainerFinal()
+			return
 		}
 
+		if runner.IsCancelled() {
+			runner.finalState = "Cancelled"
+			// but don't return yet -- we still want to
+			// capture partial output and write logs
+		}
+
+		checkErr(runner.CaptureOutput())
 		checkErr(runner.CommitLogs())
 		checkErr(runner.UpdateContainerFinal())
-
-		runner.stopSignals()
-		runner.CleanupDirs()
-
-		runner.CrunchLog.Printf("crunch-run finished")
-		runner.CrunchLog.Close()
 	}()
 
 	err = runner.fetchContainerRecord()

commit e2bf56f0a0fa1f6b4fb7b4efc4db5178b074b8ce
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date:   Tue Nov 7 10:14:40 2017 -0500

    12538: Refactor crunch-run shutdown
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>

diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index 557f30d..ead9184 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -221,7 +221,7 @@ func (runner *ContainerRunner) stop() {
 	}
 }
 
-func (runner *ContainerRunner) teardown() {
+func (runner *ContainerRunner) stopSignals() {
 	if runner.SigChan != nil {
 		signal.Stop(runner.SigChan)
 		close(runner.SigChan)
@@ -358,8 +358,6 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
 		return fmt.Errorf("While creating keep mount temp dir: %v", err)
 	}
 
-	runner.CleanupTempDir = append(runner.CleanupTempDir, runner.ArvMountPoint)
-
 	pdhOnly := true
 	tmpcount := 0
 	arvMountCmd := []string{
@@ -1255,15 +1253,15 @@ func (runner *ContainerRunner) getCollectionManifestForPath(mnt arvados.Mount, b
 
 func (runner *ContainerRunner) CleanupDirs() {
 	if runner.ArvMount != nil {
-		delay := 8
+		var delay int64 = 8
 		umount := exec.Command("arv-mount", fmt.Sprintf("--unmount-timeout=%d", delay), "--unmount", runner.ArvMountPoint)
 		umnterr := umount.Start()
 		if umnterr != nil {
 			runner.CrunchLog.Printf("Error running %v: %v", umount.Args, umnterr)
 		} else {
 			// If arv-mount --unmount gets stuck for any reason, we
-			// don't want to wait for it.  Spin off the wait for
-			// child process so it doesn't block crunch-run.
+			// don't want to wait for it forever.  Do Wait() in a goroutine
+			// so it doesn't block crunch-run.
 			go func() {
 				mnterr := umount.Wait()
 				if mnterr != nil {
@@ -1271,23 +1269,23 @@ func (runner *ContainerRunner) CleanupDirs() {
 				}
 			}()
 
-			timeout := time.NewTimer((delay + 1) * time.Second)
 			select {
 			case <-runner.ArvMountExit:
 				break
-			case <-timeout.C:
+			case <-time.After(time.Duration((delay + 1) * int64(time.Second))):
 				runner.CrunchLog.Printf("Timed out waiting for %v", umount.Args)
+				umount.Process.Kill()
 			}
 		}
+	}
+
+	if runner.ArvMountPoint != "" {
 		if rmerr := os.Remove(runner.ArvMountPoint); rmerr != nil {
 			runner.CrunchLog.Printf("While cleaning up arv-mount directory %s: %v", runner.ArvMountPoint, rmerr)
 		}
 	}
 
 	for _, tmpdir := range runner.CleanupTempDir {
-		if tmpdir == runner.ArvMountPoint {
-			continue
-		}
 		if rmerr := os.RemoveAll(tmpdir); rmerr != nil {
 			runner.CrunchLog.Printf("While cleaning up temporary directory %s: %v", tmpdir, rmerr)
 		}
@@ -1414,13 +1412,6 @@ func (runner *ContainerRunner) Run() (err error) {
 		runner.CrunchLog.Printf("Executing on host '%s'", hostname)
 	}
 
-	// Clean up temporary directories _after_ finalizing
-	// everything (if we've made any by then)
-	defer func() {
-		runner.CrunchLog.Printf("crunch-run finished")
-	}()
-	defer runner.CleanupDirs()
-
 	runner.finalState = "Queued"
 
 	defer func() {
@@ -1446,28 +1437,21 @@ func (runner *ContainerRunner) Run() (err error) {
 		// Log the error encountered in Run(), if any
 		checkErr(err)
 
-		if runner.finalState == "Queued" {
-			runner.CrunchLog.Close()
-			runner.UpdateContainerFinal()
-			return
-		}
-
-		if runner.IsCancelled() {
-			runner.finalState = "Cancelled"
-			// but don't return yet -- we still want to
-			// capture partial output and write logs
+		if runner.finalState != "Queued" {
+			if runner.IsCancelled() {
+				runner.finalState = "Cancelled"
+			}
+			checkErr(runner.CaptureOutput())
 		}
 
-		checkErr(runner.CaptureOutput())
 		checkErr(runner.CommitLogs())
 		checkErr(runner.UpdateContainerFinal())
 
-		// The real log is already closed, but then we opened
-		// a new one in case we needed to log anything while
-		// finalizing.
-		runner.CrunchLog.Close()
+		runner.stopSignals()
+		runner.CleanupDirs()
 
-		runner.teardown()
+		runner.CrunchLog.Printf("crunch-run finished")
+		runner.CrunchLog.Close()
 	}()
 
 	err = runner.fetchContainerRecord()
diff --git a/services/crunch-run/crunchrun_test.go b/services/crunch-run/crunchrun_test.go
index 5992be0..bc0b312 100644
--- a/services/crunch-run/crunchrun_test.go
+++ b/services/crunch-run/crunchrun_test.go
@@ -670,7 +670,7 @@ func FullRunHelper(c *C, record string, extraMounts []string, exitCode int, fn f
 	}
 	c.Check(api.WasSetRunning, Equals, true)
 
-	c.Check(api.Content[api.Calls-1]["container"].(arvadosclient.Dict)["log"], NotNil)
+	c.Check(api.Content[api.Calls-2]["container"].(arvadosclient.Dict)["log"], NotNil)
 
 	if err != nil {
 		for k, v := range api.Logs {
@@ -1010,6 +1010,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
 			"--read-write", "--crunchstat-interval=5",
 			"--mount-by-pdh", "by_id", realTemp + "/keep1"})
 		c.Check(cr.Binds, DeepEquals, []string{realTemp + "/2:/tmp"})
+		os.RemoveAll(cr.ArvMountPoint)
 		cr.CleanupDirs()
 		checkEmpty()
 	}
@@ -1028,6 +1029,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
 			"--read-write", "--crunchstat-interval=5",
 			"--mount-by-pdh", "by_id", realTemp + "/keep1"})
 		c.Check(cr.Binds, DeepEquals, []string{realTemp + "/2:/out", realTemp + "/3:/tmp"})
+		os.RemoveAll(cr.ArvMountPoint)
 		cr.CleanupDirs()
 		checkEmpty()
 	}
@@ -1048,6 +1050,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
 			"--read-write", "--crunchstat-interval=5",
 			"--mount-by-pdh", "by_id", realTemp + "/keep1"})
 		c.Check(cr.Binds, DeepEquals, []string{realTemp + "/2:/tmp", stubCertPath + ":/etc/arvados/ca-certificates.crt:ro"})
+		os.RemoveAll(cr.ArvMountPoint)
 		cr.CleanupDirs()
 		checkEmpty()
 
@@ -1070,6 +1073,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
 			"--read-write", "--crunchstat-interval=5",
 			"--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", realTemp + "/keep1"})
 		c.Check(cr.Binds, DeepEquals, []string{realTemp + "/keep1/tmp0:/keeptmp"})
+		os.RemoveAll(cr.ArvMountPoint)
 		cr.CleanupDirs()
 		checkEmpty()
 	}
@@ -1094,6 +1098,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
 		sort.StringSlice(cr.Binds).Sort()
 		c.Check(cr.Binds, DeepEquals, []string{realTemp + "/keep1/by_id/59389a8f9ee9d399be35462a0f92541c+53:/keepinp:ro",
 			realTemp + "/keep1/tmp0:/keepout"})
+		os.RemoveAll(cr.ArvMountPoint)
 		cr.CleanupDirs()
 		checkEmpty()
 	}
@@ -1119,6 +1124,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
 		sort.StringSlice(cr.Binds).Sort()
 		c.Check(cr.Binds, DeepEquals, []string{realTemp + "/keep1/by_id/59389a8f9ee9d399be35462a0f92541c+53:/keepinp:ro",
 			realTemp + "/keep1/tmp0:/keepout"})
+		os.RemoveAll(cr.ArvMountPoint)
 		cr.CleanupDirs()
 		checkEmpty()
 	}
@@ -1143,6 +1149,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
 		content, err := ioutil.ReadFile(realTemp + "/2/mountdata.json")
 		c.Check(err, IsNil)
 		c.Check(content, DeepEquals, []byte(test.out))
+		os.RemoveAll(cr.ArvMountPoint)
 		cr.CleanupDirs()
 		checkEmpty()
 	}
@@ -1166,6 +1173,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
 			"--read-write", "--crunchstat-interval=5",
 			"--file-cache", "512", "--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", realTemp + "/keep1"})
 		c.Check(cr.Binds, DeepEquals, []string{realTemp + "/2:/tmp", realTemp + "/keep1/tmp0:/tmp/foo:ro"})
+		os.RemoveAll(cr.ArvMountPoint)
 		cr.CleanupDirs()
 		checkEmpty()
 	}
@@ -1184,6 +1192,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
 		err := cr.SetupMounts()
 		c.Check(err, NotNil)
 		c.Check(err, ErrorMatches, `Writable mount points are not permitted underneath the output_path.*`)
+		os.RemoveAll(cr.ArvMountPoint)
 		cr.CleanupDirs()
 		checkEmpty()
 	}
@@ -1202,6 +1211,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
 		err := cr.SetupMounts()
 		c.Check(err, NotNil)
 		c.Check(err, ErrorMatches, `Only mount points of kind 'collection' are supported underneath the output_path.*`)
+		os.RemoveAll(cr.ArvMountPoint)
 		cr.CleanupDirs()
 		checkEmpty()
 	}
@@ -1218,6 +1228,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
 		err := cr.SetupMounts()
 		c.Check(err, NotNil)
 		c.Check(err, ErrorMatches, `Unsupported mount kind 'tmp' for stdin.*`)
+		os.RemoveAll(cr.ArvMountPoint)
 		cr.CleanupDirs()
 		checkEmpty()
 	}

commit 3b278b8959a80103506470457485f523dcbfba50
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date:   Mon Nov 6 16:41:05 2017 -0500

    12538: Refactor and assume arv-mount --unmount does all the hard work.
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>

diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index 7080ca2..557f30d 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -1253,34 +1253,30 @@ func (runner *ContainerRunner) getCollectionManifestForPath(mnt arvados.Mount, b
 	return extracted.Text, nil
 }
 
-func (runner *ContainerRunner) tryUnmount(umount *exec.Cmd) error {
-	umnterr := umount.Start()
-	if umnterr != nil {
-		runner.CrunchLog.Printf("Error: %v", umnterr)
-	}
-	go func() {
-		mnterr := umount.Wait()
-		if mnterr != nil {
-			runner.CrunchLog.Printf("Error running %v: %v", umount.Args, mnterr)
-		}
-	}()
-
-	timeout := time.NewTimer(9 * time.Second)
-	select {
-	case <-runner.ArvMountExit:
-		return nil
-	case <-timeout.C:
-		return fmt.Errorf("Timed out")
-	}
-}
-
 func (runner *ContainerRunner) CleanupDirs() {
 	if runner.ArvMount != nil {
-		if err := runner.tryUnmount(exec.Command("fusermount", "-u", runner.ArvMountPoint)); err != nil {
-			runner.CrunchLog.Printf("arv-mount not ended, will try force unmount: %v", err)
-			err = runner.tryUnmount(exec.Command("arv-mount", "--unmount-timeout=8", "--unmount", runner.ArvMountPoint))
-			if err != nil {
-				runner.CrunchLog.Printf("Error running arv-mount --unmount: %v", err)
+		delay := 8
+		umount := exec.Command("arv-mount", fmt.Sprintf("--unmount-timeout=%d", delay), "--unmount", runner.ArvMountPoint)
+		umnterr := umount.Start()
+		if umnterr != nil {
+			runner.CrunchLog.Printf("Error running %v: %v", umount.Args, umnterr)
+		} else {
+			// If arv-mount --unmount gets stuck for any reason, we
+			// don't want to wait for it.  Spin off the wait for
+			// child process so it doesn't block crunch-run.
+			go func() {
+				mnterr := umount.Wait()
+				if mnterr != nil {
+					runner.CrunchLog.Printf("Error running %v: %v", umount.Args, mnterr)
+				}
+			}()
+
+			timeout := time.NewTimer((delay + 1) * time.Second)
+			select {
+			case <-runner.ArvMountExit:
+				break
+			case <-timeout.C:
+				runner.CrunchLog.Printf("Timed out waiting for %v", umount.Args)
 			}
 		}
 		if rmerr := os.Remove(runner.ArvMountPoint); rmerr != nil {

commit ec11b576da48f0272121f77268cef39a54c9bb7b
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date:   Fri Nov 3 16:14:42 2017 -0400

    12538: Refactor force-unmount behavior.
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>

diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index 9e05489..7080ca2 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -183,7 +183,6 @@ type ContainerRunner struct {
 	enableNetwork string // one of "default" or "always"
 	networkMode   string // passed through to HostConfig.NetworkMode
 	arvMountLog   *ThrottledLogger
-	arvMountKill  func()
 }
 
 // setupSignals sets up signal handling to gracefully terminate the underlying
@@ -309,10 +308,6 @@ func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (
 		return nil, err
 	}
 
-	runner.arvMountKill = func() {
-		c.Process.Kill()
-	}
-
 	statReadme := make(chan bool)
 	runner.ArvMountExit = make(chan error)
 
@@ -1258,38 +1253,46 @@ func (runner *ContainerRunner) getCollectionManifestForPath(mnt arvados.Mount, b
 	return extracted.Text, nil
 }
 
+func (runner *ContainerRunner) tryUnmount(umount *exec.Cmd) error {
+	umnterr := umount.Start()
+	if umnterr != nil {
+		runner.CrunchLog.Printf("Error: %v", umnterr)
+	}
+	go func() {
+		mnterr := umount.Wait()
+		if mnterr != nil {
+			runner.CrunchLog.Printf("Error running %v: %v", umount.Args, mnterr)
+		}
+	}()
+
+	timeout := time.NewTimer(9 * time.Second)
+	select {
+	case <-runner.ArvMountExit:
+		return nil
+	case <-timeout.C:
+		return fmt.Errorf("Timed out")
+	}
+}
+
 func (runner *ContainerRunner) CleanupDirs() {
 	if runner.ArvMount != nil {
-		var umount *exec.Cmd
-		umount = exec.Command("fusermount", "-u", "-z", runner.ArvMountPoint)
-		done := false
-		try := 1
-		for !done {
-			umnterr := umount.Run()
-			if umnterr != nil {
-				runner.CrunchLog.Printf("Error: %v", umnterr)
-			}
-			timeout := time.NewTimer(10 * time.Second)
-			select {
-			case <-runner.ArvMountExit:
-				done = true
-			case <-timeout.C:
-				if try == 1 {
-					runner.CrunchLog.Printf("Timeout waiting for arv-mount to end.  Will force unmount.")
-					umount = exec.Command("arv-mount", "--unmount-timeout=10", "--unmount", runner.ArvMountPoint)
-					try = 2
-				} else {
-					runner.CrunchLog.Printf("Killing arv-mount")
-					runner.arvMountKill()
-					umount = exec.Command("fusermount", "-u", "-z", runner.ArvMountPoint)
-				}
+		if err := runner.tryUnmount(exec.Command("fusermount", "-u", runner.ArvMountPoint)); err != nil {
+			runner.CrunchLog.Printf("arv-mount not ended, will try force unmount: %v", err)
+			err = runner.tryUnmount(exec.Command("arv-mount", "--unmount-timeout=8", "--unmount", runner.ArvMountPoint))
+			if err != nil {
+				runner.CrunchLog.Printf("Error running arv-mount --unmount: %v", err)
 			}
 		}
+		if rmerr := os.Remove(runner.ArvMountPoint); rmerr != nil {
+			runner.CrunchLog.Printf("While cleaning up arv-mount directory %s: %v", runner.ArvMountPoint, rmerr)
+		}
 	}
 
 	for _, tmpdir := range runner.CleanupTempDir {
-		rmerr := os.RemoveAll(tmpdir)
-		if rmerr != nil {
+		if tmpdir == runner.ArvMountPoint {
+			continue
+		}
+		if rmerr := os.RemoveAll(tmpdir); rmerr != nil {
 			runner.CrunchLog.Printf("While cleaning up temporary directory %s: %v", tmpdir, rmerr)
 		}
 	}

commit fff3b19b8a7b8eca06065bdf60b0541c26e27935
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date:   Fri Nov 3 15:04:00 2017 -0400

    12538: Fix tests.
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>

diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index 55edb99..9e05489 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -1299,7 +1299,9 @@ func (runner *ContainerRunner) CleanupDirs() {
 func (runner *ContainerRunner) CommitLogs() error {
 	runner.CrunchLog.Print(runner.finalState)
 
-	runner.arvMountLog.Close()
+	if runner.arvMountLog != nil {
+		runner.arvMountLog.Close()
+	}
 	runner.CrunchLog.Close()
 
 	// Closing CrunchLog above allows them to be committed to Keep at this
diff --git a/services/crunch-run/crunchrun_test.go b/services/crunch-run/crunchrun_test.go
index d3c9990..5992be0 100644
--- a/services/crunch-run/crunchrun_test.go
+++ b/services/crunch-run/crunchrun_test.go
@@ -131,7 +131,7 @@ func (t *TestDockerClient) ContainerCreate(ctx context.Context, config *dockerco
 
 func (t *TestDockerClient) ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error {
 	if container == "abcde" {
-		go t.fn(t)
+		// t.fn gets executed in ContainerWait
 		return nil
 	} else {
 		return errors.New("Invalid container id")
@@ -147,6 +147,7 @@ func (t *TestDockerClient) ContainerWait(ctx context.Context, container string,
 	body := make(chan dockercontainer.ContainerWaitOKBody)
 	err := make(chan error)
 	go func() {
+		t.fn(t)
 		body <- dockercontainer.ContainerWaitOKBody{StatusCode: int64(t.finish)}
 		close(body)
 		close(err)
@@ -1002,10 +1003,12 @@ func (s *TestSuite) TestSetupMounts(c *C) {
 		cr.Container.Mounts = make(map[string]arvados.Mount)
 		cr.Container.Mounts["/tmp"] = arvados.Mount{Kind: "tmp"}
 		cr.OutputPath = "/tmp"
-
+		cr.statInterval = 5 * time.Second
 		err := cr.SetupMounts()
 		c.Check(err, IsNil)
-		c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other", "--read-write", "--mount-by-pdh", "by_id", realTemp + "/keep1"})
+		c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other",
+			"--read-write", "--crunchstat-interval=5",
+			"--mount-by-pdh", "by_id", realTemp + "/keep1"})
 		c.Check(cr.Binds, DeepEquals, []string{realTemp + "/2:/tmp"})
 		cr.CleanupDirs()
 		checkEmpty()
@@ -1021,7 +1024,9 @@ func (s *TestSuite) TestSetupMounts(c *C) {
 
 		err := cr.SetupMounts()
 		c.Check(err, IsNil)
-		c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other", "--read-write", "--mount-by-pdh", "by_id", realTemp + "/keep1"})
+		c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other",
+			"--read-write", "--crunchstat-interval=5",
+			"--mount-by-pdh", "by_id", realTemp + "/keep1"})
 		c.Check(cr.Binds, DeepEquals, []string{realTemp + "/2:/out", realTemp + "/3:/tmp"})
 		cr.CleanupDirs()
 		checkEmpty()
@@ -1039,7 +1044,9 @@ func (s *TestSuite) TestSetupMounts(c *C) {
 
 		err := cr.SetupMounts()
 		c.Check(err, IsNil)
-		c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other", "--read-write", "--mount-by-pdh", "by_id", realTemp + "/keep1"})
+		c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other",
+			"--read-write", "--crunchstat-interval=5",
+			"--mount-by-pdh", "by_id", realTemp + "/keep1"})
 		c.Check(cr.Binds, DeepEquals, []string{realTemp + "/2:/tmp", stubCertPath + ":/etc/arvados/ca-certificates.crt:ro"})
 		cr.CleanupDirs()
 		checkEmpty()
@@ -1059,7 +1066,9 @@ func (s *TestSuite) TestSetupMounts(c *C) {
 
 		err := cr.SetupMounts()
 		c.Check(err, IsNil)
-		c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other", "--read-write", "--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", realTemp + "/keep1"})
+		c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other",
+			"--read-write", "--crunchstat-interval=5",
+			"--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", realTemp + "/keep1"})
 		c.Check(cr.Binds, DeepEquals, []string{realTemp + "/keep1/tmp0:/keeptmp"})
 		cr.CleanupDirs()
 		checkEmpty()
@@ -1079,7 +1088,9 @@ func (s *TestSuite) TestSetupMounts(c *C) {
 
 		err := cr.SetupMounts()
 		c.Check(err, IsNil)
-		c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other", "--read-write", "--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", realTemp + "/keep1"})
+		c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other",
+			"--read-write", "--crunchstat-interval=5",
+			"--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", realTemp + "/keep1"})
 		sort.StringSlice(cr.Binds).Sort()
 		c.Check(cr.Binds, DeepEquals, []string{realTemp + "/keep1/by_id/59389a8f9ee9d399be35462a0f92541c+53:/keepinp:ro",
 			realTemp + "/keep1/tmp0:/keepout"})
@@ -1102,7 +1113,9 @@ func (s *TestSuite) TestSetupMounts(c *C) {
 
 		err := cr.SetupMounts()
 		c.Check(err, IsNil)
-		c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other", "--read-write", "--file-cache", "512", "--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", realTemp + "/keep1"})
+		c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other",
+			"--read-write", "--crunchstat-interval=5",
+			"--file-cache", "512", "--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", realTemp + "/keep1"})
 		sort.StringSlice(cr.Binds).Sort()
 		c.Check(cr.Binds, DeepEquals, []string{realTemp + "/keep1/by_id/59389a8f9ee9d399be35462a0f92541c+53:/keepinp:ro",
 			realTemp + "/keep1/tmp0:/keepout"})
@@ -1149,7 +1162,9 @@ func (s *TestSuite) TestSetupMounts(c *C) {
 
 		err := cr.SetupMounts()
 		c.Check(err, IsNil)
-		c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other", "--read-write", "--file-cache", "512", "--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", realTemp + "/keep1"})
+		c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other",
+			"--read-write", "--crunchstat-interval=5",
+			"--file-cache", "512", "--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", realTemp + "/keep1"})
 		c.Check(cr.Binds, DeepEquals, []string{realTemp + "/2:/tmp", realTemp + "/keep1/tmp0:/tmp/foo:ro"})
 		cr.CleanupDirs()
 		checkEmpty()

commit 3c5de241f6a6ac56e8bf986c89ffe153b9d941fe
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date:   Fri Nov 3 11:34:46 2017 -0400

    12538: Make handling of arv-mount process more robust
    
    * If initial "fusermount -u -z" of arv-mount dir times out, use "arv-mount --unmount"
    
    * If that doesn't work, send sigkill to arv-mount and then run "fusermount -u
      -z" again
    
    * Refactor "stop container if arv-mount terminates" behavior
    
    * Make arv-mount logging channel work as intended and enable crunchstat for arv-mount.
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>

diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index a424088..55edb99 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -182,6 +182,7 @@ type ContainerRunner struct {
 
 	enableNetwork string // one of "default" or "always"
 	networkMode   string // passed through to HostConfig.NetworkMode
+	arvMountLog   *ThrottledLogger
 	arvMountKill  func()
 }
 
@@ -194,7 +195,10 @@ func (runner *ContainerRunner) setupSignals() {
 	signal.Notify(runner.SigChan, syscall.SIGQUIT)
 
 	go func(sig chan os.Signal) {
-		<-sig
+		s := <-sig
+		if s != nil {
+			runner.CrunchLog.Printf("Caught signal %v", s)
+		}
 		runner.stop()
 	}(runner.SigChan)
 }
@@ -211,8 +215,10 @@ func (runner *ContainerRunner) stop() {
 		timeout := time.Duration(10)
 		err := runner.Docker.ContainerStop(context.TODO(), runner.ContainerID, &(timeout))
 		if err != nil {
-			log.Printf("StopContainer failed: %s", err)
+			runner.CrunchLog.Printf("StopContainer failed: %s", err)
 		}
+		// Suppress multiple calls to stop()
+		runner.cStarted = false
 	}
 }
 
@@ -292,9 +298,11 @@ func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (
 	}
 	c.Env = append(c.Env, "ARVADOS_API_TOKEN="+token)
 
-	nt := NewThrottledLogger(runner.NewLogWriter("arv-mount"))
-	c.Stdout = nt
-	c.Stderr = nt
+	runner.arvMountLog = NewThrottledLogger(runner.NewLogWriter("arv-mount"))
+	c.Stdout = runner.arvMountLog
+	c.Stderr = runner.arvMountLog
+
+	runner.CrunchLog.Printf("Running %v", c.Args)
 
 	err = c.Start()
 	if err != nil {
@@ -322,7 +330,11 @@ func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (
 	}()
 
 	go func() {
-		runner.ArvMountExit <- c.Wait()
+		mnterr := c.Wait()
+		if mnterr != nil {
+			runner.CrunchLog.Printf("Arv-mount exit error: %v", mnterr)
+		}
+		runner.ArvMountExit <- mnterr
 		close(runner.ArvMountExit)
 	}()
 
@@ -355,7 +367,11 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
 
 	pdhOnly := true
 	tmpcount := 0
-	arvMountCmd := []string{"--foreground", "--allow-other", "--read-write"}
+	arvMountCmd := []string{
+		"--foreground",
+		"--allow-other",
+		"--read-write",
+		fmt.Sprintf("--crunchstat-interval=%v", runner.statInterval.Seconds())}
 
 	if runner.Container.RuntimeConstraints.KeepCacheRAM > 0 {
 		arvMountCmd = append(arvMountCmd, "--file-cache", fmt.Sprintf("%d", runner.Container.RuntimeConstraints.KeepCacheRAM))
@@ -896,12 +912,23 @@ func (runner *ContainerRunner) WaitFinish() (err error) {
 
 	waitOk, waitErr := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID, "not-running")
 
+	go func() {
+		<-runner.ArvMountExit
+		if runner.cStarted {
+			runner.CrunchLog.Printf("arv-mount exited while container is still running.  Stopping container.")
+			runner.stop()
+		}
+	}()
+
 	var waitBody dockercontainer.ContainerWaitOKBody
 	select {
 	case waitBody = <-waitOk:
 	case err = <-waitErr:
 	}
 
+	// Container isn't running any more
+	runner.cStarted = false
+
 	if err != nil {
 		return fmt.Errorf("container wait: %v", err)
 	}
@@ -910,15 +937,6 @@ func (runner *ContainerRunner) WaitFinish() (err error) {
 	code := int(waitBody.StatusCode)
 	runner.ExitCode = &code
 
-	waitMount := runner.ArvMountExit
-	select {
-	case err = <-waitMount:
-		runner.CrunchLog.Printf("arv-mount exited before container finished: %v", err)
-		waitMount = nil
-		runner.stop()
-	default:
-	}
-
 	// wait for stdout/stderr to complete
 	<-runner.loggingDone
 
@@ -1242,25 +1260,29 @@ func (runner *ContainerRunner) getCollectionManifestForPath(mnt arvados.Mount, b
 
 func (runner *ContainerRunner) CleanupDirs() {
 	if runner.ArvMount != nil {
-		//umount := exec.Command("fusermount", "-u", runner.ArvMountPoint)
-		umount := exec.Command("sleep", "1")
-		umnterr := umount.Run()
-		if umnterr != nil {
-			log.Printf("While running fusermount: %v", umnterr)
-		}
-		timeout := time.NewTimer(10 * time.Second)
-		select {
-		case mnterr := <-runner.ArvMountExit:
-			if mnterr != nil {
-				log.Printf("Arv-mount exit error: %v", mnterr)
-			}
-		case <-timeout.C:
-			log.Printf("Timeout waiting for arv-mount to end.  Killing arv-mount.")
-			runner.arvMountKill()
-			umount = exec.Command("arv-mount", "--unmount-timeout=10", "--unmount", runner.ArvMountPoint)
-			umnterr = umount.Run()
+		var umount *exec.Cmd
+		umount = exec.Command("fusermount", "-u", "-z", runner.ArvMountPoint)
+		done := false
+		try := 1
+		for !done {
+			umnterr := umount.Run()
 			if umnterr != nil {
-				log.Printf("While running arv-mount --unmount: %v", umnterr)
+				runner.CrunchLog.Printf("Error: %v", umnterr)
+			}
+			timeout := time.NewTimer(10 * time.Second)
+			select {
+			case <-runner.ArvMountExit:
+				done = true
+			case <-timeout.C:
+				if try == 1 {
+					runner.CrunchLog.Printf("Timeout waiting for arv-mount to end.  Will force unmount.")
+					umount = exec.Command("arv-mount", "--unmount-timeout=10", "--unmount", runner.ArvMountPoint)
+					try = 2
+				} else {
+					runner.CrunchLog.Printf("Killing arv-mount")
+					runner.arvMountKill()
+					umount = exec.Command("fusermount", "-u", "-z", runner.ArvMountPoint)
+				}
 			}
 		}
 	}
@@ -1276,14 +1298,17 @@ func (runner *ContainerRunner) CleanupDirs() {
 // CommitLogs posts the collection containing the final container logs.
 func (runner *ContainerRunner) CommitLogs() error {
 	runner.CrunchLog.Print(runner.finalState)
+
+	runner.arvMountLog.Close()
 	runner.CrunchLog.Close()
 
-	// Closing CrunchLog above allows it to be committed to Keep at this
+	// Closing CrunchLog above allows them to be committed to Keep at this
 	// point, but re-open crunch log with ArvClient in case there are any
-	// other further (such as failing to write the log to Keep!) while
-	// shutting down
+	// other further errors (such as failing to write the log to Keep!)
+	// while shutting down
 	runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{ArvClient: runner.ArvClient,
 		UUID: runner.Container.UUID, loggingStream: "crunch-run", writeCloser: nil})
+	runner.CrunchLog.Immediate = log.New(os.Stderr, runner.Container.UUID+" ", 0)
 
 	if runner.LogsPDH != nil {
 		// If we have already assigned something to LogsPDH,
@@ -1370,8 +1395,11 @@ func (runner *ContainerRunner) IsCancelled() bool {
 
 // NewArvLogWriter creates an ArvLogWriter
 func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
-	return &ArvLogWriter{ArvClient: runner.ArvClient, UUID: runner.Container.UUID, loggingStream: name,
-		writeCloser: runner.LogCollection.Open(name + ".txt")}
+	return &ArvLogWriter{
+		ArvClient:     runner.ArvClient,
+		UUID:          runner.Container.UUID,
+		loggingStream: name,
+		writeCloser:   runner.LogCollection.Open(name + ".txt")}
 }
 
 // Run the full container lifecycle.
@@ -1387,6 +1415,9 @@ func (runner *ContainerRunner) Run() (err error) {
 
 	// Clean up temporary directories _after_ finalizing
 	// everything (if we've made any by then)
+	defer func() {
+		runner.CrunchLog.Printf("crunch-run finished")
+	}()
 	defer runner.CleanupDirs()
 
 	runner.finalState = "Queued"
diff --git a/services/crunch-run/upload.go b/services/crunch-run/upload.go
index 95925e5..b54e336 100644
--- a/services/crunch-run/upload.go
+++ b/services/crunch-run/upload.go
@@ -18,14 +18,15 @@ import (
 	"crypto/md5"
 	"errors"
 	"fmt"
-	"git.curoverse.com/arvados.git/sdk/go/keepclient"
-	"git.curoverse.com/arvados.git/sdk/go/manifest"
 	"io"
 	"log"
 	"os"
 	"path/filepath"
 	"strings"
 	"sync"
+
+	"git.curoverse.com/arvados.git/sdk/go/keepclient"
+	"git.curoverse.com/arvados.git/sdk/go/manifest"
 )
 
 // Block is a data block in a manifest stream

commit 899d369bef489b89d9ce1b1cd5e07ce8304a9a85
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date:   Thu Nov 2 17:52:58 2017 -0400

    Force kill arv-mount wip
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>

diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index 3678bd8..a424088 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -182,6 +182,7 @@ type ContainerRunner struct {
 
 	enableNetwork string // one of "default" or "always"
 	networkMode   string // passed through to HostConfig.NetworkMode
+	arvMountKill  func()
 }
 
 // setupSignals sets up signal handling to gracefully terminate the underlying
@@ -300,6 +301,10 @@ func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (
 		return nil, err
 	}
 
+	runner.arvMountKill = func() {
+		c.Process.Kill()
+	}
+
 	statReadme := make(chan bool)
 	runner.ArvMountExit = make(chan error)
 
@@ -1237,15 +1242,26 @@ func (runner *ContainerRunner) getCollectionManifestForPath(mnt arvados.Mount, b
 
 func (runner *ContainerRunner) CleanupDirs() {
 	if runner.ArvMount != nil {
-		umount := exec.Command("fusermount", "-z", "-u", runner.ArvMountPoint)
+		//umount := exec.Command("fusermount", "-u", runner.ArvMountPoint)
+		umount := exec.Command("sleep", "1")
 		umnterr := umount.Run()
 		if umnterr != nil {
-			runner.CrunchLog.Printf("While running fusermount: %v", umnterr)
+			log.Printf("While running fusermount: %v", umnterr)
 		}
-
-		mnterr := <-runner.ArvMountExit
-		if mnterr != nil {
-			runner.CrunchLog.Printf("Arv-mount exit error: %v", mnterr)
+		timeout := time.NewTimer(10 * time.Second)
+		select {
+		case mnterr := <-runner.ArvMountExit:
+			if mnterr != nil {
+				log.Printf("Arv-mount exit error: %v", mnterr)
+			}
+		case <-timeout.C:
+			log.Printf("Timeout waiting for arv-mount to end.  Killing arv-mount.")
+			runner.arvMountKill()
+			umount = exec.Command("arv-mount", "--unmount-timeout=10", "--unmount", runner.ArvMountPoint)
+			umnterr = umount.Run()
+			if umnterr != nil {
+				log.Printf("While running arv-mount --unmount: %v", umnterr)
+			}
 		}
 	}
 

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list