[arvados] updated: 2.1.0-2637-gdc70bbf9e

git repository hosting git at public.arvados.org
Wed Jul 6 19:07:43 UTC 2022


Summary of changes:
 cmd/arvados-client/container_gateway.go      |  5 +-
 cmd/arvados-client/container_gateway_test.go |  9 +++
 cmd/arvados-server/cmd.go                    |  2 +
 lib/controller/localdb/container_gateway.go  |  2 +-
 lib/controller/rpc/conn.go                   |  2 +-
 lib/crunchrun/container_gateway.go           | 87 ++++++++++++++++++++++------
 lib/crunchrun/crunchrun.go                   |  4 ++
 sdk/go/arvados/container_gateway.go          | 22 ++++---
 8 files changed, 104 insertions(+), 29 deletions(-)

       via  dc70bbf9ea15395476107a3b8dff96f754a40998 (commit)
       via  091ae55fc1df3ec50490becd437d512e38b0f972 (commit)
       via  a42604972cccf8dd9c8341c260927a6c48c62b84 (commit)
       via  817ee84de15cdc960990e86af8ce705073fcafba (commit)
       via  31a270c68c24b7de994655ee788478a64b6bdfb7 (commit)
       via  bad3a575113abaf7a232f85dd15c4d1e1b60c0e2 (commit)
       via  3e27af4fa076222a18d602091060b9d11ee2079c (commit)
       via  49f99d7b9a8706c48baedba3659d7ca98b02dde3 (commit)
      from  87f3da84318306184165dae50f75ac6721d89285 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit dc70bbf9ea15395476107a3b8dff96f754a40998
Author: Tom Clegg <tom at curii.com>
Date:   Wed Jul 6 15:04:55 2022 -0400

    19166: Remove debug log.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/lib/crunchrun/container_gateway.go b/lib/crunchrun/container_gateway.go
index 1002de733..3cb93fc74 100644
--- a/lib/crunchrun/container_gateway.go
+++ b/lib/crunchrun/container_gateway.go
@@ -564,7 +564,7 @@ func (gw *Gateway) handleSession(ctx context.Context, newch ssh.NewChannel, deta
 			// would be a gaping security
 			// hole).
 		default:
-			fmt.Fprintf(logw, "declined request %q on ssh channel"+eol, req.Type)
+			// fmt.Fprintf(logw, "declined request %q on ssh channel"+eol, req.Type)
 		}
 		if req.WantReply {
 			req.Reply(ok, nil)

commit 091ae55fc1df3ec50490becd437d512e38b0f972
Author: Tom Clegg <tom at curii.com>
Date:   Wed Jul 6 15:02:20 2022 -0400

    19166: Test for hung-waiting-for-stdin bug.
    
    (Test fails on main branch, passes here with the StdinPipe fix in
    previous commit.)
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/cmd/arvados-client/container_gateway_test.go b/cmd/arvados-client/container_gateway_test.go
index f4a140c40..743b91d69 100644
--- a/cmd/arvados-client/container_gateway_test.go
+++ b/cmd/arvados-client/container_gateway_test.go
@@ -25,6 +25,7 @@ import (
 	"git.arvados.org/arvados.git/lib/crunchrun"
 	"git.arvados.org/arvados.git/sdk/go/arvados"
 	"git.arvados.org/arvados.git/sdk/go/arvadostest"
+	"git.arvados.org/arvados.git/sdk/go/ctxlog"
 	"git.arvados.org/arvados.git/sdk/go/httpserver"
 	check "gopkg.in/check.v1"
 )
@@ -53,6 +54,7 @@ func (s *ClientSuite) TestShellGateway(c *check.C) {
 		ContainerUUID: uuid,
 		Address:       "0.0.0.0:0",
 		AuthSecret:    authSecret,
+		Log:           ctxlog.TestLogger(c),
 		// Just forward connections to localhost instead of a
 		// container, so we can test without running a
 		// container.
@@ -86,6 +88,13 @@ func (s *ClientSuite) TestShellGateway(c *check.C) {
 	cmd.Env = append(cmd.Env, "ARVADOS_API_TOKEN="+arvadostest.ActiveTokenV2)
 	cmd.Stdout = &stdout
 	cmd.Stderr = &stderr
+	stdin, err := cmd.StdinPipe()
+	c.Assert(err, check.IsNil)
+	go fmt.Fprintln(stdin, "data appears on stdin, but stdin does not close; cmd should exit anyway, not hang")
+	time.AfterFunc(5*time.Second, func() {
+		c.Errorf("timed out -- remote end is probably hung waiting for us to close stdin")
+		stdin.Close()
+	})
 	c.Check(cmd.Run(), check.IsNil)
 	c.Check(stdout.String(), check.Equals, "ok\n")
 

commit a42604972cccf8dd9c8341c260927a6c48c62b84
Author: Tom Clegg <tom at curii.com>
Date:   Wed Jul 6 14:44:57 2022 -0400

    19166: Close ssh session when exec/shell command exits.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/cmd/arvados-client/container_gateway.go b/cmd/arvados-client/container_gateway.go
index aca6c5b79..55f8c33bc 100644
--- a/cmd/arvados-client/container_gateway.go
+++ b/cmd/arvados-client/container_gateway.go
@@ -160,7 +160,9 @@ Options:
 		fmt.Fprintf(stderr, "target UUID is not a container or container request UUID: %s\n", targetUUID)
 		return 1
 	}
-	sshconn, err := rpcconn.ContainerSSH(context.TODO(), arvados.ContainerSSHOptions{
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+	sshconn, err := rpcconn.ContainerSSH(ctx, arvados.ContainerSSHOptions{
 		UUID:          targetUUID,
 		DetachKeys:    *detachKeys,
 		LoginUsername: loginUsername,
@@ -176,7 +178,6 @@ Options:
 		return 0
 	}
 
-	ctx, cancel := context.WithCancel(context.Background())
 	go func() {
 		defer cancel()
 		_, err := io.Copy(stdout, sshconn.Conn)
diff --git a/lib/crunchrun/container_gateway.go b/lib/crunchrun/container_gateway.go
index 6fae73798..1002de733 100644
--- a/lib/crunchrun/container_gateway.go
+++ b/lib/crunchrun/container_gateway.go
@@ -242,18 +242,16 @@ func (gw *Gateway) runTunnel(addr string) error {
 				defer wg.Done()
 				_, err := io.Copy(gwconn, muxconn)
 				if err != nil {
-					gw.Log.Printf("tunnel connection %d: tunnel: %s", muxconn.StreamID(), err)
+					gw.Log.Printf("tunnel connection %d: mux end: %s", muxconn.StreamID(), err)
 				}
-				muxconn.Close()
 				gwconn.Close()
 			}()
 			go func() {
 				defer wg.Done()
 				_, err := io.Copy(muxconn, gwconn)
 				if err != nil {
-					gw.Log.Printf("tunnel connection %d: gateway: %s", muxconn.StreamID(), err)
+					gw.Log.Printf("tunnel connection %d: gateway end: %s", muxconn.StreamID(), err)
 				}
-				gwconn.Close()
 				muxconn.Close()
 			}()
 			wg.Wait()
@@ -402,9 +400,11 @@ func (gw *Gateway) handleDirectTCPIP(ctx context.Context, newch ssh.NewChannel)
 func (gw *Gateway) handleSession(ctx context.Context, newch ssh.NewChannel, detachKeys, username string) {
 	ch, reqs, err := newch.Accept()
 	if err != nil {
-		gw.Log.Printf("accept session channel: %s", err)
+		gw.Log.Printf("error accepting session channel: %s", err)
 		return
 	}
+	defer ch.Close()
+
 	var pty0, tty0 *os.File
 	// Where to send errors/messages for the client to see
 	logw := io.Writer(ch.Stderr())
@@ -413,10 +413,28 @@ func (gw *Gateway) handleSession(ctx context.Context, newch ssh.NewChannel, deta
 	eol := "\n"
 	// Env vars to add to child process
 	termEnv := []string(nil)
-	for req := range reqs {
+
+	started := 0
+	wantClose := make(chan struct{})
+	for {
+		var req *ssh.Request
+		select {
+		case r, ok := <-reqs:
+			if !ok {
+				return
+			}
+			req = r
+		case <-wantClose:
+			return
+		}
 		ok := false
 		switch req.Type {
 		case "shell", "exec":
+			if started++; started != 1 {
+				// RFC 4254 6.5: "Only one of these
+				// requests can succeed per channel."
+				break
+			}
 			ok = true
 			var payload struct {
 				Command string
@@ -436,7 +454,7 @@ func (gw *Gateway) handleSession(ctx context.Context, newch ssh.NewChannel, deta
 				}
 				defer func() {
 					ch.SendRequest("exit-status", false, ssh.Marshal(&resp))
-					ch.Close()
+					close(wantClose)
 				}()
 
 				cmd, err := gw.Target.InjectCommand(ctx, detachKeys, username, tty0 != nil, execargs)
@@ -446,20 +464,39 @@ func (gw *Gateway) handleSession(ctx context.Context, newch ssh.NewChannel, deta
 					resp.Status = 1
 					return
 				}
-				cmd.Stdin = ch
-				cmd.Stdout = ch
-				cmd.Stderr = ch.Stderr()
 				if tty0 != nil {
 					cmd.Stdin = tty0
 					cmd.Stdout = tty0
 					cmd.Stderr = tty0
-					var wg sync.WaitGroup
-					defer wg.Wait()
-					wg.Add(2)
-					go func() { io.Copy(ch, pty0); wg.Done() }()
-					go func() { io.Copy(pty0, ch); wg.Done() }()
+					go io.Copy(ch, pty0)
+					go io.Copy(pty0, ch)
 					// Send our own debug messages to tty as well.
 					logw = tty0
+				} else {
+					// StdinPipe may seem
+					// superfluous here, but it's
+					// not: it causes cmd.Run() to
+					// return when the subprocess
+					// exits. Without it, Run()
+					// waits for stdin to close,
+					// which causes "ssh ... echo
+					// ok" (with the client's
+					// stdin connected to a
+					// terminal or something) to
+					// hang.
+					stdin, err := cmd.StdinPipe()
+					if err != nil {
+						fmt.Fprintln(ch.Stderr(), err)
+						ch.CloseWrite()
+						resp.Status = 1
+						return
+					}
+					go func() {
+						io.Copy(stdin, ch)
+						stdin.Close()
+					}()
+					cmd.Stdout = ch
+					cmd.Stderr = ch.Stderr()
 				}
 				cmd.SysProcAttr = &syscall.SysProcAttr{
 					Setctty: tty0 != nil,
@@ -527,7 +564,7 @@ func (gw *Gateway) handleSession(ctx context.Context, newch ssh.NewChannel, deta
 			// would be a gaping security
 			// hole).
 		default:
-			// fmt.Fprintf(logw, "declining %q req"+eol, req.Type)
+			fmt.Fprintf(logw, "declined request %q on ssh channel"+eol, req.Type)
 		}
 		if req.WantReply {
 			req.Reply(ok, nil)
diff --git a/sdk/go/arvados/container_gateway.go b/sdk/go/arvados/container_gateway.go
index ec16ee2be..897ae434e 100644
--- a/sdk/go/arvados/container_gateway.go
+++ b/sdk/go/arvados/container_gateway.go
@@ -34,8 +34,8 @@ func (cresp ConnectionResponse) ServeHTTP(w http.ResponseWriter, req *http.Reque
 	defer conn.Close()
 
 	var bytesIn, bytesOut int64
-	var wg sync.WaitGroup
 	ctx, cancel := context.WithCancel(req.Context())
+	var wg sync.WaitGroup
 	wg.Add(1)
 	go func() {
 		defer wg.Done()
@@ -49,7 +49,6 @@ func (cresp ConnectionResponse) ServeHTTP(w http.ResponseWriter, req *http.Reque
 		if err != nil {
 			ctxlog.FromContext(ctx).WithError(err).Error("error copying downstream")
 		}
-		conn.Close()
 	}()
 	wg.Add(1)
 	go func() {
@@ -64,13 +63,17 @@ func (cresp ConnectionResponse) ServeHTTP(w http.ResponseWriter, req *http.Reque
 		if err != nil {
 			ctxlog.FromContext(ctx).WithError(err).Error("error copying upstream")
 		}
-		cresp.Conn.Close()
 	}()
-	wg.Wait()
-	if cresp.Logger != nil {
-		cresp.Logger.WithFields(logrus.Fields{
-			"bytesIn":  bytesIn,
-			"bytesOut": bytesOut,
-		}).Info("closed connection")
-	}
+	<-ctx.Done()
+	go func() {
+		// Wait for both io.Copy goroutines to finish and increment
+		// their byte counters.
+		wg.Wait()
+		if cresp.Logger != nil {
+			cresp.Logger.WithFields(logrus.Fields{
+				"bytesIn":  bytesIn,
+				"bytesOut": bytesOut,
+			}).Info("closed connection")
+		}
+	}()
 }

commit 817ee84de15cdc960990e86af8ce705073fcafba
Author: Tom Clegg <tom at curii.com>
Date:   Tue Jul 5 14:47:14 2022 -0400

    19166: Close connections to container gateway when finished.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/sdk/go/arvados/container_gateway.go b/sdk/go/arvados/container_gateway.go
index ce33fb310..ec16ee2be 100644
--- a/sdk/go/arvados/container_gateway.go
+++ b/sdk/go/arvados/container_gateway.go
@@ -15,6 +15,7 @@ import (
 )
 
 func (cresp ConnectionResponse) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+	defer cresp.Conn.Close()
 	hj, ok := w.(http.Hijacker)
 	if !ok {
 		http.Error(w, "ResponseWriter does not support connection upgrade", http.StatusInternalServerError)
@@ -48,6 +49,7 @@ func (cresp ConnectionResponse) ServeHTTP(w http.ResponseWriter, req *http.Reque
 		if err != nil {
 			ctxlog.FromContext(ctx).WithError(err).Error("error copying downstream")
 		}
+		conn.Close()
 	}()
 	wg.Add(1)
 	go func() {
@@ -62,6 +64,7 @@ func (cresp ConnectionResponse) ServeHTTP(w http.ResponseWriter, req *http.Reque
 		if err != nil {
 			ctxlog.FromContext(ctx).WithError(err).Error("error copying upstream")
 		}
+		cresp.Conn.Close()
 	}()
 	wg.Wait()
 	if cresp.Logger != nil {

commit 31a270c68c24b7de994655ee788478a64b6bdfb7
Author: Tom Clegg <tom at curii.com>
Date:   Tue Jul 5 14:38:42 2022 -0400

    19166: Improve debug/log messages.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/lib/crunchrun/container_gateway.go b/lib/crunchrun/container_gateway.go
index 02df06cf2..6fae73798 100644
--- a/lib/crunchrun/container_gateway.go
+++ b/lib/crunchrun/container_gateway.go
@@ -168,6 +168,10 @@ func (gw *Gateway) Start() error {
 	if err != nil {
 		return err
 	}
+	go func() {
+		err := srv.Wait()
+		gw.Log.Printf("gateway server stopped: %s", err)
+	}()
 	// Get the port number we are listening on (extPort might be
 	// "0" or a port name, in which case this will be different).
 	_, listenPort, err := net.SplitHostPort(srv.Addr)
@@ -184,6 +188,7 @@ func (gw *Gateway) Start() error {
 	// non-tunnel connections aren't available; and PORT is the
 	// port number we are listening on.
 	gw.Address = net.JoinHostPort(extHost, listenPort)
+	gw.Log.Printf("gateway server listening at %s", gw.Address)
 	if gw.ArvadosClient != nil {
 		go gw.maintainTunnel(gw.Address)
 	}
@@ -218,16 +223,16 @@ func (gw *Gateway) runTunnel(addr string) error {
 		gw.UpdateTunnelURL(url)
 	}
 	for {
-		muxconn, err := mux.Accept()
+		muxconn, err := mux.AcceptStream()
 		if err != nil {
 			return err
 		}
-		gw.Log.Printf("receiving connection from tunnel, remoteAddr %s", muxconn.RemoteAddr().String())
+		gw.Log.Printf("tunnel connection %d started", muxconn.StreamID())
 		go func() {
 			defer muxconn.Close()
 			gwconn, err := net.Dial("tcp", addr)
 			if err != nil {
-				gw.Log.Printf("error connecting to %s on behalf of tunnel connection: %s", addr, err)
+				gw.Log.Printf("tunnel connection %d: error connecting to %s: %s", muxconn.StreamID(), addr, err)
 				return
 			}
 			defer gwconn.Close()
@@ -235,13 +240,24 @@ func (gw *Gateway) runTunnel(addr string) error {
 			wg.Add(2)
 			go func() {
 				defer wg.Done()
-				io.Copy(gwconn, muxconn)
+				_, err := io.Copy(gwconn, muxconn)
+				if err != nil {
+					gw.Log.Printf("tunnel connection %d: tunnel: %s", muxconn.StreamID(), err)
+				}
+				muxconn.Close()
+				gwconn.Close()
 			}()
 			go func() {
 				defer wg.Done()
-				io.Copy(muxconn, gwconn)
+				_, err := io.Copy(muxconn, gwconn)
+				if err != nil {
+					gw.Log.Printf("tunnel connection %d: gateway: %s", muxconn.StreamID(), err)
+				}
+				gwconn.Close()
+				muxconn.Close()
 			}()
 			wg.Wait()
+			gw.Log.Printf("tunnel connection %d finished", muxconn.StreamID())
 		}()
 	}
 }

commit bad3a575113abaf7a232f85dd15c4d1e1b60c0e2
Author: Tom Clegg <tom at curii.com>
Date:   Tue Jul 5 09:57:23 2022 -0400

    19166: Improve error messages.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/lib/controller/localdb/container_gateway.go b/lib/controller/localdb/container_gateway.go
index 90c95deb3..77c5182e9 100644
--- a/lib/controller/localdb/container_gateway.go
+++ b/lib/controller/localdb/container_gateway.go
@@ -187,7 +187,7 @@ func (conn *Conn) ContainerSSH(ctx context.Context, opts arvados.ContainerSSHOpt
 	})
 	err = tlsconn.HandshakeContext(ctx)
 	if err != nil {
-		return sshconn, httpserver.ErrorWithStatus(err, http.StatusBadGateway)
+		return sshconn, httpserver.ErrorWithStatus(fmt.Errorf("TLS handshake failed: %w", err), http.StatusBadGateway)
 	}
 	if respondAuth == "" {
 		tlsconn.Close()
diff --git a/lib/controller/rpc/conn.go b/lib/controller/rpc/conn.go
index 1475a5e01..0e532f23c 100644
--- a/lib/controller/rpc/conn.go
+++ b/lib/controller/rpc/conn.go
@@ -414,7 +414,7 @@ func (conn *Conn) socket(ctx context.Context, u *url.URL, upgradeHeader string,
 		} else {
 			message = fmt.Sprintf("%q", body)
 		}
-		return connresp, fmt.Errorf("server did not provide a tunnel: %s %s", resp.Status, message)
+		return connresp, fmt.Errorf("server did not provide a tunnel: %s: %s", resp.Status, message)
 	}
 	if strings.ToLower(resp.Header.Get("Upgrade")) != upgradeHeader ||
 		strings.ToLower(resp.Header.Get("Connection")) != "upgrade" {

commit 3e27af4fa076222a18d602091060b9d11ee2079c
Author: Tom Clegg <tom at curii.com>
Date:   Tue Jul 5 09:56:51 2022 -0400

    19166: Support `crunch-run -version`.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go
index ff02257f2..0bf19083a 100644
--- a/lib/crunchrun/crunchrun.go
+++ b/lib/crunchrun/crunchrun.go
@@ -1743,6 +1743,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
 	runtimeEngine := flags.String("runtime-engine", "docker", "container runtime: docker or singularity")
 	brokenNodeHook := flags.String("broken-node-hook", "", "script to run if node is detected to be broken (for example, Docker daemon is not running)")
 	flags.Duration("check-containerd", 0, "Ignored. Exists for compatibility with older versions.")
+	version := flags.Bool("version", false, "Write version information to stdout and exit 0.")
 
 	ignoreDetachFlag := false
 	if len(args) > 0 && args[0] == "-no-detach" {
@@ -1758,6 +1759,9 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
 
 	if ok, code := cmd.ParseFlags(flags, prog, args, "container-uuid", stderr); !ok {
 		return code
+	} else if *version {
+		fmt.Fprintln(stdout, prog, cmd.Version.String())
+		return 0
 	} else if !*list && flags.NArg() != 1 {
 		fmt.Fprintf(stderr, "missing required argument: container-uuid (try -help)\n")
 		return 2

commit 49f99d7b9a8706c48baedba3659d7ca98b02dde3
Author: Tom Clegg <tom at curii.com>
Date:   Mon Jul 4 11:16:49 2022 -0400

    18947: Add arvados-server dispatch-slurm subcommand.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/cmd/arvados-server/cmd.go b/cmd/arvados-server/cmd.go
index 3a1fcd4c6..b3feca437 100644
--- a/cmd/arvados-server/cmd.go
+++ b/cmd/arvados-server/cmd.go
@@ -25,6 +25,7 @@ import (
 	"git.arvados.org/arvados.git/lib/service"
 	"git.arvados.org/arvados.git/sdk/go/arvados"
 	"git.arvados.org/arvados.git/sdk/go/health"
+	dispatchslurm "git.arvados.org/arvados.git/services/crunch-dispatch-slurm"
 	"git.arvados.org/arvados.git/services/githttpd"
 	keepbalance "git.arvados.org/arvados.git/services/keep-balance"
 	keepweb "git.arvados.org/arvados.git/services/keep-web"
@@ -50,6 +51,7 @@ var (
 		"crunch-run":         crunchrun.Command,
 		"dispatch-cloud":     dispatchcloud.Command,
 		"dispatch-lsf":       lsf.DispatchCommand,
+		"dispatch-slurm":     dispatchslurm.Command,
 		"git-httpd":          githttpd.Command,
 		"health":             healthCommand,
 		"install":            install.Command,

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list