[arvados] created: 2.5.0-306-gf642876f2

git repository hosting git at public.arvados.org
Tue Apr 4 07:17:24 UTC 2023


        at  f642876f2d0831db063ce413df4c142ee6e5ee69 (commit)


commit f642876f2d0831db063ce413df4c142ee6e5ee69
Author: Tom Clegg <tom at curii.com>
Date:   Tue Apr 4 03:17:12 2023 -0400

    18790: Add arvados-client logs command.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/cmd/arvados-client/cmd.go b/cmd/arvados-client/cmd.go
index c10783c97..19d13437c 100644
--- a/cmd/arvados-client/cmd.go
+++ b/cmd/arvados-client/cmd.go
@@ -55,12 +55,13 @@ var (
 		"virtual_machine":          cli.APICall,
 		"workflow":                 cli.APICall,
 
-		"mount":                mount.Command,
-		"deduplication-report": deduplicationreport.Command,
-		"costanalyzer":         costanalyzer.Command,
-		"shell":                shellCommand{},
 		"connect-ssh":          connectSSHCommand{},
+		"costanalyzer":         costanalyzer.Command,
+		"deduplication-report": deduplicationreport.Command,
 		"diagnostics":          diagnostics.Command{},
+		"logs":                 logsCommand{},
+		"mount":                mount.Command,
+		"shell":                shellCommand{},
 		"sudo":                 sudoCommand{},
 	})
 )
diff --git a/cmd/arvados-client/container_gateway.go b/cmd/arvados-client/container_gateway.go
index 55f8c33bc..7a3519042 100644
--- a/cmd/arvados-client/container_gateway.go
+++ b/cmd/arvados-client/container_gateway.go
@@ -7,21 +7,209 @@ package main
 import (
 	"bytes"
 	"context"
+	"crypto/tls"
 	"flag"
 	"fmt"
 	"io"
+	"net/http"
 	"net/url"
 	"os"
 	"os/exec"
 	"path/filepath"
 	"strings"
 	"syscall"
+	"time"
 
 	"git.arvados.org/arvados.git/lib/cmd"
 	"git.arvados.org/arvados.git/lib/controller/rpc"
 	"git.arvados.org/arvados.git/sdk/go/arvados"
 )
 
+// logsCommand displays logs from a running container.
+type logsCommand struct {
+	ac *arvados.Client
+}
+
+func (lc logsCommand) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
+	f := flag.NewFlagSet(prog, flag.ContinueOnError)
+	pollInterval := f.Duration("poll", time.Second*2, "minimum duration to wait before polling for new data")
+	if ok, code := cmd.ParseFlags(f, prog, args, "container-uuid", stderr); !ok {
+		return code
+	} else if f.NArg() < 1 {
+		fmt.Fprintf(stderr, "missing required argument: container-uuid (try -help)\n")
+		return 2
+	} else if f.NArg() > 1 {
+		fmt.Fprintf(stderr, "encountered extra arguments after container-uuid (try -help)\n")
+		return 2
+	}
+	target := f.Args()[0]
+
+	lc.ac = arvados.NewClientFromEnv()
+	lc.ac.Client = &http.Client{}
+	if lc.ac.Insecure {
+		lc.ac.Client.Transport = &http.Transport{
+			TLSClientConfig: &tls.Config{
+				InsecureSkipVerify: true}}
+	}
+
+	err := lc.tailf(target, stdout, stderr, *pollInterval)
+	if err != nil {
+		fmt.Fprintln(stderr, err)
+		return 1
+	}
+	return 0
+}
+
+func (lc *logsCommand) tailf(target string, stdout, stderr io.Writer, pollInterval time.Duration) error {
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
+	rpcconn := rpcFromEnv()
+	ctrUUID, err := resolveToContainerUUID(rpcconn, target)
+	if err != nil {
+		return err
+	}
+	fmt.Fprintln(stderr, "connecting to container", ctrUUID)
+
+	var (
+		// files to display
+		watching = []string{"crunch-run.txt", "stderr.txt"}
+		// fnm => file offset of next byte to display
+		mark = map[string]int64{}
+		// fnm => current size of file reported by api
+		size = map[string]int64{}
+		// exit after fetching next log chunk
+		containerFinished = false
+	)
+
+poll:
+	for delay := pollInterval; ; time.Sleep(delay) {
+		// When /arvados/v1/containers/{uuid}/log_events is
+		// implemented, we'll wait here for the next
+		// server-sent event to tell us some updated file
+		// sizes. For now, we poll.
+		for _, fnm := range watching {
+			currentsize, _, err := lc.copyRange(ctx, ctrUUID, fnm, "0-0", nil)
+			if err != nil {
+				fmt.Fprintln(stderr, err)
+				delay = pollInterval
+				continue poll
+			}
+			size[fnm] = currentsize
+			if oldsize, seen := mark[fnm]; !seen && currentsize > 10000 {
+				mark[fnm] = currentsize - 10000
+			} else if !seen {
+				mark[fnm] = 0
+			} else if currentsize < oldsize {
+				// Log collection must have been
+				// emptied and reset.
+				fmt.Fprintln(stderr, "--- log restarted ---")
+				for fnm := range mark {
+					delete(mark, fnm)
+				}
+				delay = pollInterval
+				continue poll
+			}
+		}
+		anyNewData := false
+		for _, fnm := range watching {
+			if size[fnm] > mark[fnm] {
+				anyNewData = true
+				_, n, err := lc.copyRange(ctx, ctrUUID, fnm, fmt.Sprintf("%d-", mark[fnm]), stdout)
+				if err != nil {
+					fmt.Fprintln(stderr, err)
+				}
+				mark[fnm] += n
+			}
+		}
+		if containerFinished {
+			// If the caller specified a container request
+			// UUID and the container we were watching has
+			// been replaced by a new one, start watching
+			// logs from the new one. Otherwise, we're
+			// done.
+			if target == ctrUUID {
+				// caller specified container UUID
+				return nil
+			}
+			newUUID, err := resolveToContainerUUID(rpcconn, target)
+			if err != nil {
+				return err
+			}
+			if newUUID == ctrUUID {
+				// no further attempts
+				return nil
+			}
+			ctrUUID = newUUID
+			containerFinished = false
+			delay = 0
+		} else if anyNewData {
+			delay = pollInterval
+		} else {
+			delay = delay * 2
+			if delay > pollInterval*5 {
+				delay = pollInterval * 5
+			}
+			ctr, err := rpcconn.ContainerGet(ctx, arvados.GetOptions{UUID: ctrUUID, Select: []string{"state"}})
+			if err != nil {
+				fmt.Fprintln(stderr, err)
+				delay = pollInterval
+				continue
+			}
+			if ctr.State == arvados.ContainerStateCancelled || ctr.State == arvados.ContainerStateComplete {
+				containerFinished = true
+				delay = 0
+			}
+		}
+	}
+	return nil
+}
+
+// Retrieve specified byte range (e.g., "12-34", "1234-") from given
+// fnm and write to out.
+//
+// If range is empty ("0-0"), out can be nil.
+//
+// Return values are current file size, bytes copied, error.
+//
+// If the file does not exist, return values are 0, 0, nil.
+func (lc *logsCommand) copyRange(ctx context.Context, uuid, fnm, byterange string, out io.Writer) (int64, int64, error) {
+	ctx, cancel := context.WithDeadline(ctx, time.Now().Add(20*time.Second))
+	defer cancel()
+	srcURL := url.URL{
+		Scheme: "https",
+		Host:   lc.ac.APIHost,
+		Path:   "/arvados/v1/containers/" + uuid + "/log/" + fnm,
+	}
+	req, err := http.NewRequestWithContext(ctx, http.MethodGet, srcURL.String(), nil)
+	if err != nil {
+		return 0, 0, err
+	}
+	req.Header.Set("Range", "bytes="+byterange)
+	req.Header.Set("Authorization", "Bearer "+lc.ac.AuthToken)
+	resp, err := lc.ac.Client.Do(req)
+	if err != nil {
+		return 0, 0, err
+	}
+	defer resp.Body.Close()
+	if resp.StatusCode == http.StatusNotFound {
+		return 0, 0, nil
+	}
+	if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent {
+		return 0, 0, fmt.Errorf("error getting %s: %s", fnm, resp.Status)
+	}
+	var rstart, rend, rsize int64
+	_, err = fmt.Sscanf(resp.Header.Get("Content-Range"), "bytes %d-%d/%d", &rstart, &rend, &rsize)
+	if err != nil {
+		return 0, 0, fmt.Errorf("error parsing Content-Range header %q: %s", resp.Header.Get("Content-Range"), err)
+	}
+	if out == nil {
+		return rsize, 0, nil
+	}
+	n, err := io.Copy(out, resp.Body)
+	return rsize, n, err
+}
+
 // shellCommand connects the terminal to an interactive shell on a
 // running container.
 type shellCommand struct{}
@@ -129,37 +317,14 @@ Options:
 		fmt.Fprintln(stderr, "fatal: ARVADOS_API_HOST and ARVADOS_API_TOKEN environment variables are not set")
 		return 1
 	}
-	insecure := os.Getenv("ARVADOS_API_HOST_INSECURE")
-	rpcconn := rpc.NewConn("",
-		&url.URL{
-			Scheme: "https",
-			Host:   os.Getenv("ARVADOS_API_HOST"),
-		},
-		insecure == "1" || insecure == "yes" || insecure == "true",
-		func(context.Context) ([]string, error) {
-			return []string{os.Getenv("ARVADOS_API_TOKEN")}, nil
-		})
-	if strings.Contains(targetUUID, "-xvhdp-") {
-		crs, err := rpcconn.ContainerRequestList(context.TODO(), arvados.ListOptions{Limit: -1, Filters: []arvados.Filter{{"uuid", "=", targetUUID}}})
-		if err != nil {
-			fmt.Fprintln(stderr, err)
-			return 1
-		}
-		if len(crs.Items) < 1 {
-			fmt.Fprintf(stderr, "container request %q not found\n", targetUUID)
-			return 1
-		}
-		cr := crs.Items[0]
-		if cr.ContainerUUID == "" {
-			fmt.Fprintf(stderr, "no container assigned, container request state is %s\n", strings.ToLower(string(cr.State)))
-			return 1
-		}
-		targetUUID = cr.ContainerUUID
-		fmt.Fprintln(stderr, "connecting to container", targetUUID)
-	} else if !strings.Contains(targetUUID, "-dz642-") {
-		fmt.Fprintf(stderr, "target UUID is not a container or container request UUID: %s\n", targetUUID)
+	rpcconn := rpcFromEnv()
+	targetUUID, err := resolveToContainerUUID(rpcconn, targetUUID)
+	if err != nil {
+		fmt.Fprintln(stderr, err)
 		return 1
 	}
+	fmt.Fprintln(stderr, "connecting to container", targetUUID)
+
 	ctx, cancel := context.WithCancel(context.Background())
 	defer cancel()
 	sshconn, err := rpcconn.ContainerSSH(ctx, arvados.ContainerSSHOptions{
@@ -199,3 +364,38 @@ Options:
 func shellescape(s string) string {
 	return "'" + strings.Replace(s, "'", "'\\''", -1) + "'"
 }
+
+func rpcFromEnv() *rpc.Conn {
+	insecure := os.Getenv("ARVADOS_API_HOST_INSECURE")
+	return rpc.NewConn("",
+		&url.URL{
+			Scheme: "https",
+			Host:   os.Getenv("ARVADOS_API_HOST"),
+		},
+		insecure == "1" || insecure == "yes" || insecure == "true",
+		func(context.Context) ([]string, error) {
+			return []string{os.Getenv("ARVADOS_API_TOKEN")}, nil
+		})
+}
+
+func resolveToContainerUUID(rpcconn *rpc.Conn, targetUUID string) (string, error) {
+	switch {
+	case strings.Contains(targetUUID, "-dz642-"):
+		return targetUUID, nil
+	case strings.Contains(targetUUID, "-xvhdp-"):
+		crs, err := rpcconn.ContainerRequestList(context.TODO(), arvados.ListOptions{Limit: -1, Filters: []arvados.Filter{{"uuid", "=", targetUUID}}})
+		if err != nil {
+			return "", err
+		}
+		if len(crs.Items) < 1 {
+			return "", fmt.Errorf("container request %q not found\n", targetUUID)
+		}
+		cr := crs.Items[0]
+		if cr.ContainerUUID == "" {
+			return "", fmt.Errorf("no container assigned, container request state is %s\n", strings.ToLower(string(cr.State)))
+		}
+		return cr.ContainerUUID, nil
+	default:
+		return "", fmt.Errorf("target UUID is not a container or container request UUID: %s\n", targetUUID)
+	}
+}
diff --git a/cmd/arvados-client/container_gateway_test.go b/cmd/arvados-client/container_gateway_test.go
index 743b91d69..0e5aad709 100644
--- a/cmd/arvados-client/container_gateway_test.go
+++ b/cmd/arvados-client/container_gateway_test.go
@@ -10,6 +10,7 @@ import (
 	"crypto/hmac"
 	"crypto/sha256"
 	"fmt"
+	"io"
 	"io/ioutil"
 	"net"
 	"net/http"
@@ -24,9 +25,11 @@ import (
 	"git.arvados.org/arvados.git/lib/controller/rpc"
 	"git.arvados.org/arvados.git/lib/crunchrun"
 	"git.arvados.org/arvados.git/sdk/go/arvados"
+	"git.arvados.org/arvados.git/sdk/go/arvadosclient"
 	"git.arvados.org/arvados.git/sdk/go/arvadostest"
 	"git.arvados.org/arvados.git/sdk/go/ctxlog"
 	"git.arvados.org/arvados.git/sdk/go/httpserver"
+	"git.arvados.org/arvados.git/sdk/go/keepclient"
 	check "gopkg.in/check.v1"
 )
 
@@ -178,3 +181,133 @@ func (s *ClientSuite) TestShellGateway(c *check.C) {
 	}
 	wg.Wait()
 }
+
+func (s *ClientSuite) TestContainerLog(c *check.C) {
+	arvadostest.StartKeep(2, true)
+	ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(20*time.Second))
+	defer cancel()
+
+	rpcconn := rpc.NewConn("",
+		&url.URL{
+			Scheme: "https",
+			Host:   os.Getenv("ARVADOS_API_HOST"),
+		},
+		true,
+		func(context.Context) ([]string, error) {
+			return []string{arvadostest.SystemRootToken}, nil
+		})
+	imageColl, err := rpcconn.CollectionCreate(ctx, arvados.CreateOptions{Attrs: map[string]interface{}{
+		"manifest_text": ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855.tar\n",
+	}})
+	c.Assert(err, check.IsNil)
+	c.Logf("imageColl %+v", imageColl)
+	cr, err := rpcconn.ContainerRequestCreate(ctx, arvados.CreateOptions{Attrs: map[string]interface{}{
+		"state":           "Committed",
+		"command":         []string{"echo", fmt.Sprintf("%d", time.Now().Unix())},
+		"container_image": imageColl.PortableDataHash,
+		"cwd":             "/",
+		"output_path":     "/",
+		"priority":        1,
+		"runtime_constraints": arvados.RuntimeConstraints{
+			VCPUs: 1,
+			RAM:   1000000000,
+		},
+		"container_count_max": 1,
+	}})
+	c.Assert(err, check.IsNil)
+	h := hmac.New(sha256.New, []byte(arvadostest.SystemRootToken))
+	fmt.Fprint(h, cr.ContainerUUID)
+	authSecret := fmt.Sprintf("%x", h.Sum(nil))
+
+	coll := arvados.Collection{}
+	client := arvados.NewClientFromEnv()
+	ac, err := arvadosclient.New(client)
+	c.Assert(err, check.IsNil)
+	kc, err := keepclient.MakeKeepClient(ac)
+	c.Assert(err, check.IsNil)
+	cfs, err := coll.FileSystem(client, kc)
+	c.Assert(err, check.IsNil)
+
+	c.Log("running logs command on queued container")
+	var stdout, stderr bytes.Buffer
+	cmd := exec.CommandContext(ctx, "go", "run", ".", "logs", "-poll=250ms", cr.UUID)
+	cmd.Env = append(cmd.Env, os.Environ()...)
+	cmd.Env = append(cmd.Env, "ARVADOS_API_TOKEN="+arvadostest.SystemRootToken)
+	cmd.Stdout = io.MultiWriter(&stdout, os.Stderr)
+	cmd.Stderr = io.MultiWriter(&stderr, os.Stderr)
+	err = cmd.Start()
+	c.Assert(err, check.Equals, nil)
+
+	c.Log("changing container state to Locked")
+	_, err = rpcconn.ContainerUpdate(ctx, arvados.UpdateOptions{UUID: cr.ContainerUUID, Attrs: map[string]interface{}{
+		"state": arvados.ContainerStateLocked,
+	}})
+	c.Assert(err, check.IsNil)
+	c.Log("starting gateway")
+	gw := crunchrun.Gateway{
+		ContainerUUID: cr.ContainerUUID,
+		Address:       "0.0.0.0:0",
+		AuthSecret:    authSecret,
+		Log:           ctxlog.TestLogger(c),
+		Target:        crunchrun.GatewayTargetStub{},
+		LogCollection: cfs,
+	}
+	err = gw.Start()
+	c.Assert(err, check.IsNil)
+	c.Log("updating container gateway address")
+	_, err = rpcconn.ContainerUpdate(ctx, arvados.UpdateOptions{UUID: cr.ContainerUUID, Attrs: map[string]interface{}{
+		"gateway_address": gw.Address,
+		"state":           arvados.ContainerStateRunning,
+	}})
+	c.Assert(err, check.IsNil)
+
+	fCrunchrun, err := cfs.OpenFile("crunch-run.txt", os.O_CREATE|os.O_WRONLY, 0777)
+	c.Assert(err, check.IsNil)
+	_, err = fmt.Fprintln(fCrunchrun, "line 1 of crunch-run.txt")
+	c.Assert(err, check.IsNil)
+	fStderr, err := cfs.OpenFile("stderr.txt", os.O_CREATE|os.O_WRONLY, 0777)
+	c.Assert(err, check.IsNil)
+	_, err = fmt.Fprintln(fStderr, "line 1 of stderr")
+	c.Assert(err, check.IsNil)
+	time.Sleep(time.Second * 2)
+	_, err = fmt.Fprintln(fCrunchrun, "line 2 of crunch-run.txt")
+	c.Assert(err, check.IsNil)
+	_, err = fmt.Fprintln(fStderr, "--end--")
+	c.Assert(err, check.IsNil)
+
+	for deadline := time.Now().Add(20 * time.Second); time.Now().Before(deadline) && !strings.Contains(stdout.String(), "--end--"); time.Sleep(time.Second / 10) {
+	}
+	c.Check(stdout.String(), check.Matches, `(?ms).*--end--\n.*`)
+
+	mtxt, err := cfs.MarshalManifest(".")
+	c.Assert(err, check.IsNil)
+	savedLog, err := rpcconn.CollectionCreate(ctx, arvados.CreateOptions{Attrs: map[string]interface{}{
+		"manifest_text": mtxt,
+	}})
+	c.Assert(err, check.IsNil)
+	_, err = rpcconn.ContainerUpdate(ctx, arvados.UpdateOptions{UUID: cr.ContainerUUID, Attrs: map[string]interface{}{
+		"state":     arvados.ContainerStateComplete,
+		"log":       savedLog.PortableDataHash,
+		"output":    "d41d8cd98f00b204e9800998ecf8427e+0",
+		"exit_code": 0,
+	}})
+	c.Assert(err, check.IsNil)
+
+	err = cmd.Wait()
+	c.Check(err, check.IsNil)
+	// Ensure controller doesn't cheat by fetching data from the
+	// gateway after the container is complete.
+	gw.LogCollection = nil
+
+	c.Logf("re-running logs command on completed container")
+	{
+		ctx, cancel := context.WithDeadline(ctx, time.Now().Add(time.Second*5))
+		defer cancel()
+		cmd := exec.CommandContext(ctx, "go", "run", ".", "logs", cr.UUID)
+		cmd.Env = append(cmd.Env, os.Environ()...)
+		cmd.Env = append(cmd.Env, "ARVADOS_API_TOKEN="+arvadostest.SystemRootToken)
+		buf, err := cmd.CombinedOutput()
+		c.Check(err, check.Equals, nil)
+		c.Check(string(buf), check.Matches, `(?ms).*--end--\n`)
+	}
+}
diff --git a/lib/controller/rpc/conn.go b/lib/controller/rpc/conn.go
index 70a936a6f..9856eb576 100644
--- a/lib/controller/rpc/conn.go
+++ b/lib/controller/rpc/conn.go
@@ -340,6 +340,12 @@ func (conn *Conn) ContainerUnlock(ctx context.Context, options arvados.GetOption
 }
 
 func (conn *Conn) ContainerLog(ctx context.Context, options arvados.ContainerLogOptions) (resp http.Handler, err error) {
+	tokens, err := conn.tokenProvider(ctx)
+	if err != nil {
+		return nil, err
+	} else if len(tokens) < 1 {
+		return nil, httpserver.ErrorWithStatus(errors.New("unauthorized"), http.StatusUnauthorized)
+	}
 	proxy := &httputil.ReverseProxy{
 		Transport: conn.httpClient.Transport,
 		Director: func(r *http.Request) {
@@ -347,6 +353,7 @@ func (conn *Conn) ContainerLog(ctx context.Context, options arvados.ContainerLog
 			u.Path = r.URL.Path
 			u.RawQuery = fmt.Sprintf("no_forward=%v", options.NoForward)
 			r.URL = &u
+			r.Header.Set("Authorization", "Bearer "+tokens[0])
 		},
 	}
 	return proxy, nil

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list