[arvados] created: 2.5.0-306-gf642876f2
git repository hosting
git at public.arvados.org
Tue Apr 4 07:17:24 UTC 2023
at f642876f2d0831db063ce413df4c142ee6e5ee69 (commit)
commit f642876f2d0831db063ce413df4c142ee6e5ee69
Author: Tom Clegg <tom at curii.com>
Date: Tue Apr 4 03:17:12 2023 -0400
18790: Add arvados-client logs command.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/cmd/arvados-client/cmd.go b/cmd/arvados-client/cmd.go
index c10783c97..19d13437c 100644
--- a/cmd/arvados-client/cmd.go
+++ b/cmd/arvados-client/cmd.go
@@ -55,12 +55,13 @@ var (
"virtual_machine": cli.APICall,
"workflow": cli.APICall,
- "mount": mount.Command,
- "deduplication-report": deduplicationreport.Command,
- "costanalyzer": costanalyzer.Command,
- "shell": shellCommand{},
"connect-ssh": connectSSHCommand{},
+ "costanalyzer": costanalyzer.Command,
+ "deduplication-report": deduplicationreport.Command,
"diagnostics": diagnostics.Command{},
+ "logs": logsCommand{},
+ "mount": mount.Command,
+ "shell": shellCommand{},
"sudo": sudoCommand{},
})
)
diff --git a/cmd/arvados-client/container_gateway.go b/cmd/arvados-client/container_gateway.go
index 55f8c33bc..7a3519042 100644
--- a/cmd/arvados-client/container_gateway.go
+++ b/cmd/arvados-client/container_gateway.go
@@ -7,21 +7,209 @@ package main
import (
"bytes"
"context"
+ "crypto/tls"
"flag"
"fmt"
"io"
+ "net/http"
"net/url"
"os"
"os/exec"
"path/filepath"
"strings"
"syscall"
+ "time"
"git.arvados.org/arvados.git/lib/cmd"
"git.arvados.org/arvados.git/lib/controller/rpc"
"git.arvados.org/arvados.git/sdk/go/arvados"
)
+// logsCommand displays logs from a running container.
+type logsCommand struct {
+ ac *arvados.Client
+}
+
+func (lc logsCommand) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
+ f := flag.NewFlagSet(prog, flag.ContinueOnError)
+ pollInterval := f.Duration("poll", time.Second*2, "minimum duration to wait before polling for new data")
+ if ok, code := cmd.ParseFlags(f, prog, args, "container-uuid", stderr); !ok {
+ return code
+ } else if f.NArg() < 1 {
+ fmt.Fprintf(stderr, "missing required argument: container-uuid (try -help)\n")
+ return 2
+ } else if f.NArg() > 1 {
+ fmt.Fprintf(stderr, "encountered extra arguments after container-uuid (try -help)\n")
+ return 2
+ }
+ target := f.Args()[0]
+
+ lc.ac = arvados.NewClientFromEnv()
+ lc.ac.Client = &http.Client{}
+ if lc.ac.Insecure {
+ lc.ac.Client.Transport = &http.Transport{
+ TLSClientConfig: &tls.Config{
+ InsecureSkipVerify: true}}
+ }
+
+ err := lc.tailf(target, stdout, stderr, *pollInterval)
+ if err != nil {
+ fmt.Fprintln(stderr, err)
+ return 1
+ }
+ return 0
+}
+
+func (lc *logsCommand) tailf(target string, stdout, stderr io.Writer, pollInterval time.Duration) error {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ rpcconn := rpcFromEnv()
+ ctrUUID, err := resolveToContainerUUID(rpcconn, target)
+ if err != nil {
+ return err
+ }
+ fmt.Fprintln(stderr, "connecting to container", ctrUUID)
+
+ var (
+ // files to display
+ watching = []string{"crunch-run.txt", "stderr.txt"}
+ // fnm => file offset of next byte to display
+ mark = map[string]int64{}
+ // fnm => current size of file reported by api
+ size = map[string]int64{}
+ // exit after fetching next log chunk
+ containerFinished = false
+ )
+
+poll:
+ for delay := pollInterval; ; time.Sleep(delay) {
+ // When /arvados/v1/containers/{uuid}/log_events is
+ // implemented, we'll wait here for the next
+ // server-sent event to tell us some updated file
+ // sizes. For now, we poll.
+ for _, fnm := range watching {
+ currentsize, _, err := lc.copyRange(ctx, ctrUUID, fnm, "0-0", nil)
+ if err != nil {
+ fmt.Fprintln(stderr, err)
+ delay = pollInterval
+ continue poll
+ }
+ size[fnm] = currentsize
+ if oldsize, seen := mark[fnm]; !seen && currentsize > 10000 {
+ mark[fnm] = currentsize - 10000
+ } else if !seen {
+ mark[fnm] = 0
+ } else if currentsize < oldsize {
+ // Log collection must have been
+ // emptied and reset.
+ fmt.Fprintln(stderr, "--- log restarted ---")
+ for fnm := range mark {
+ delete(mark, fnm)
+ }
+ delay = pollInterval
+ continue poll
+ }
+ }
+ anyNewData := false
+ for _, fnm := range watching {
+ if size[fnm] > mark[fnm] {
+ anyNewData = true
+ _, n, err := lc.copyRange(ctx, ctrUUID, fnm, fmt.Sprintf("%d-", mark[fnm]), stdout)
+ if err != nil {
+ fmt.Fprintln(stderr, err)
+ }
+ mark[fnm] += n
+ }
+ }
+ if containerFinished {
+ // If the caller specified a container request
+ // UUID and the container we were watching has
+ // been replaced by a new one, start watching
+ // logs from the new one. Otherwise, we're
+ // done.
+ if target == ctrUUID {
+ // caller specified container UUID
+ return nil
+ }
+ newUUID, err := resolveToContainerUUID(rpcconn, target)
+ if err != nil {
+ return err
+ }
+ if newUUID == ctrUUID {
+ // no further attempts
+ return nil
+ }
+ ctrUUID = newUUID
+ containerFinished = false
+ delay = 0
+ } else if anyNewData {
+ delay = pollInterval
+ } else {
+ delay = delay * 2
+ if delay > pollInterval*5 {
+ delay = pollInterval * 5
+ }
+ ctr, err := rpcconn.ContainerGet(ctx, arvados.GetOptions{UUID: ctrUUID, Select: []string{"state"}})
+ if err != nil {
+ fmt.Fprintln(stderr, err)
+ delay = pollInterval
+ continue
+ }
+ if ctr.State == arvados.ContainerStateCancelled || ctr.State == arvados.ContainerStateComplete {
+ containerFinished = true
+ delay = 0
+ }
+ }
+ }
+ return nil
+}
+
+// Retrieve specified byte range (e.g., "12-34", "1234-") from given
+// fnm and write to out.
+//
+// If range is empty ("0-0"), out can be nil.
+//
+// Return values are current file size, bytes copied, error.
+//
+// If the file does not exist, return values are 0, 0, nil.
+func (lc *logsCommand) copyRange(ctx context.Context, uuid, fnm, byterange string, out io.Writer) (int64, int64, error) {
+ ctx, cancel := context.WithDeadline(ctx, time.Now().Add(20*time.Second))
+ defer cancel()
+ srcURL := url.URL{
+ Scheme: "https",
+ Host: lc.ac.APIHost,
+ Path: "/arvados/v1/containers/" + uuid + "/log/" + fnm,
+ }
+ req, err := http.NewRequestWithContext(ctx, http.MethodGet, srcURL.String(), nil)
+ if err != nil {
+ return 0, 0, err
+ }
+ req.Header.Set("Range", "bytes="+byterange)
+ req.Header.Set("Authorization", "Bearer "+lc.ac.AuthToken)
+ resp, err := lc.ac.Client.Do(req)
+ if err != nil {
+ return 0, 0, err
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode == http.StatusNotFound {
+ return 0, 0, nil
+ }
+ if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent {
+ return 0, 0, fmt.Errorf("error getting %s: %s", fnm, resp.Status)
+ }
+ var rstart, rend, rsize int64
+ _, err = fmt.Sscanf(resp.Header.Get("Content-Range"), "bytes %d-%d/%d", &rstart, &rend, &rsize)
+ if err != nil {
+ return 0, 0, fmt.Errorf("error parsing Content-Range header %q: %s", resp.Header.Get("Content-Range"), err)
+ }
+ if out == nil {
+ return rsize, 0, nil
+ }
+ n, err := io.Copy(out, resp.Body)
+ return rsize, n, err
+}
+
// shellCommand connects the terminal to an interactive shell on a
// running container.
type shellCommand struct{}
@@ -129,37 +317,14 @@ Options:
fmt.Fprintln(stderr, "fatal: ARVADOS_API_HOST and ARVADOS_API_TOKEN environment variables are not set")
return 1
}
- insecure := os.Getenv("ARVADOS_API_HOST_INSECURE")
- rpcconn := rpc.NewConn("",
- &url.URL{
- Scheme: "https",
- Host: os.Getenv("ARVADOS_API_HOST"),
- },
- insecure == "1" || insecure == "yes" || insecure == "true",
- func(context.Context) ([]string, error) {
- return []string{os.Getenv("ARVADOS_API_TOKEN")}, nil
- })
- if strings.Contains(targetUUID, "-xvhdp-") {
- crs, err := rpcconn.ContainerRequestList(context.TODO(), arvados.ListOptions{Limit: -1, Filters: []arvados.Filter{{"uuid", "=", targetUUID}}})
- if err != nil {
- fmt.Fprintln(stderr, err)
- return 1
- }
- if len(crs.Items) < 1 {
- fmt.Fprintf(stderr, "container request %q not found\n", targetUUID)
- return 1
- }
- cr := crs.Items[0]
- if cr.ContainerUUID == "" {
- fmt.Fprintf(stderr, "no container assigned, container request state is %s\n", strings.ToLower(string(cr.State)))
- return 1
- }
- targetUUID = cr.ContainerUUID
- fmt.Fprintln(stderr, "connecting to container", targetUUID)
- } else if !strings.Contains(targetUUID, "-dz642-") {
- fmt.Fprintf(stderr, "target UUID is not a container or container request UUID: %s\n", targetUUID)
+ rpcconn := rpcFromEnv()
+ targetUUID, err := resolveToContainerUUID(rpcconn, targetUUID)
+ if err != nil {
+ fmt.Fprintln(stderr, err)
return 1
}
+ fmt.Fprintln(stderr, "connecting to container", targetUUID)
+
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sshconn, err := rpcconn.ContainerSSH(ctx, arvados.ContainerSSHOptions{
@@ -199,3 +364,38 @@ Options:
func shellescape(s string) string {
return "'" + strings.Replace(s, "'", "'\\''", -1) + "'"
}
+
+func rpcFromEnv() *rpc.Conn {
+ insecure := os.Getenv("ARVADOS_API_HOST_INSECURE")
+ return rpc.NewConn("",
+ &url.URL{
+ Scheme: "https",
+ Host: os.Getenv("ARVADOS_API_HOST"),
+ },
+ insecure == "1" || insecure == "yes" || insecure == "true",
+ func(context.Context) ([]string, error) {
+ return []string{os.Getenv("ARVADOS_API_TOKEN")}, nil
+ })
+}
+
+func resolveToContainerUUID(rpcconn *rpc.Conn, targetUUID string) (string, error) {
+ switch {
+ case strings.Contains(targetUUID, "-dz642-"):
+ return targetUUID, nil
+ case strings.Contains(targetUUID, "-xvhdp-"):
+ crs, err := rpcconn.ContainerRequestList(context.TODO(), arvados.ListOptions{Limit: -1, Filters: []arvados.Filter{{"uuid", "=", targetUUID}}})
+ if err != nil {
+ return "", err
+ }
+ if len(crs.Items) < 1 {
+ return "", fmt.Errorf("container request %q not found\n", targetUUID)
+ }
+ cr := crs.Items[0]
+ if cr.ContainerUUID == "" {
+ return "", fmt.Errorf("no container assigned, container request state is %s\n", strings.ToLower(string(cr.State)))
+ }
+ return cr.ContainerUUID, nil
+ default:
+ return "", fmt.Errorf("target UUID is not a container or container request UUID: %s\n", targetUUID)
+ }
+}
diff --git a/cmd/arvados-client/container_gateway_test.go b/cmd/arvados-client/container_gateway_test.go
index 743b91d69..0e5aad709 100644
--- a/cmd/arvados-client/container_gateway_test.go
+++ b/cmd/arvados-client/container_gateway_test.go
@@ -10,6 +10,7 @@ import (
"crypto/hmac"
"crypto/sha256"
"fmt"
+ "io"
"io/ioutil"
"net"
"net/http"
@@ -24,9 +25,11 @@ import (
"git.arvados.org/arvados.git/lib/controller/rpc"
"git.arvados.org/arvados.git/lib/crunchrun"
"git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/arvadosclient"
"git.arvados.org/arvados.git/sdk/go/arvadostest"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
"git.arvados.org/arvados.git/sdk/go/httpserver"
+ "git.arvados.org/arvados.git/sdk/go/keepclient"
check "gopkg.in/check.v1"
)
@@ -178,3 +181,133 @@ func (s *ClientSuite) TestShellGateway(c *check.C) {
}
wg.Wait()
}
+
+func (s *ClientSuite) TestContainerLog(c *check.C) {
+ arvadostest.StartKeep(2, true)
+ ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(20*time.Second))
+ defer cancel()
+
+ rpcconn := rpc.NewConn("",
+ &url.URL{
+ Scheme: "https",
+ Host: os.Getenv("ARVADOS_API_HOST"),
+ },
+ true,
+ func(context.Context) ([]string, error) {
+ return []string{arvadostest.SystemRootToken}, nil
+ })
+ imageColl, err := rpcconn.CollectionCreate(ctx, arvados.CreateOptions{Attrs: map[string]interface{}{
+ "manifest_text": ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855.tar\n",
+ }})
+ c.Assert(err, check.IsNil)
+ c.Logf("imageColl %+v", imageColl)
+ cr, err := rpcconn.ContainerRequestCreate(ctx, arvados.CreateOptions{Attrs: map[string]interface{}{
+ "state": "Committed",
+ "command": []string{"echo", fmt.Sprintf("%d", time.Now().Unix())},
+ "container_image": imageColl.PortableDataHash,
+ "cwd": "/",
+ "output_path": "/",
+ "priority": 1,
+ "runtime_constraints": arvados.RuntimeConstraints{
+ VCPUs: 1,
+ RAM: 1000000000,
+ },
+ "container_count_max": 1,
+ }})
+ c.Assert(err, check.IsNil)
+ h := hmac.New(sha256.New, []byte(arvadostest.SystemRootToken))
+ fmt.Fprint(h, cr.ContainerUUID)
+ authSecret := fmt.Sprintf("%x", h.Sum(nil))
+
+ coll := arvados.Collection{}
+ client := arvados.NewClientFromEnv()
+ ac, err := arvadosclient.New(client)
+ c.Assert(err, check.IsNil)
+ kc, err := keepclient.MakeKeepClient(ac)
+ c.Assert(err, check.IsNil)
+ cfs, err := coll.FileSystem(client, kc)
+ c.Assert(err, check.IsNil)
+
+ c.Log("running logs command on queued container")
+ var stdout, stderr bytes.Buffer
+ cmd := exec.CommandContext(ctx, "go", "run", ".", "logs", "-poll=250ms", cr.UUID)
+ cmd.Env = append(cmd.Env, os.Environ()...)
+ cmd.Env = append(cmd.Env, "ARVADOS_API_TOKEN="+arvadostest.SystemRootToken)
+ cmd.Stdout = io.MultiWriter(&stdout, os.Stderr)
+ cmd.Stderr = io.MultiWriter(&stderr, os.Stderr)
+ err = cmd.Start()
+ c.Assert(err, check.Equals, nil)
+
+ c.Log("changing container state to Locked")
+ _, err = rpcconn.ContainerUpdate(ctx, arvados.UpdateOptions{UUID: cr.ContainerUUID, Attrs: map[string]interface{}{
+ "state": arvados.ContainerStateLocked,
+ }})
+ c.Assert(err, check.IsNil)
+ c.Log("starting gateway")
+ gw := crunchrun.Gateway{
+ ContainerUUID: cr.ContainerUUID,
+ Address: "0.0.0.0:0",
+ AuthSecret: authSecret,
+ Log: ctxlog.TestLogger(c),
+ Target: crunchrun.GatewayTargetStub{},
+ LogCollection: cfs,
+ }
+ err = gw.Start()
+ c.Assert(err, check.IsNil)
+ c.Log("updating container gateway address")
+ _, err = rpcconn.ContainerUpdate(ctx, arvados.UpdateOptions{UUID: cr.ContainerUUID, Attrs: map[string]interface{}{
+ "gateway_address": gw.Address,
+ "state": arvados.ContainerStateRunning,
+ }})
+ c.Assert(err, check.IsNil)
+
+ fCrunchrun, err := cfs.OpenFile("crunch-run.txt", os.O_CREATE|os.O_WRONLY, 0777)
+ c.Assert(err, check.IsNil)
+ _, err = fmt.Fprintln(fCrunchrun, "line 1 of crunch-run.txt")
+ c.Assert(err, check.IsNil)
+ fStderr, err := cfs.OpenFile("stderr.txt", os.O_CREATE|os.O_WRONLY, 0777)
+ c.Assert(err, check.IsNil)
+ _, err = fmt.Fprintln(fStderr, "line 1 of stderr")
+ c.Assert(err, check.IsNil)
+ time.Sleep(time.Second * 2)
+ _, err = fmt.Fprintln(fCrunchrun, "line 2 of crunch-run.txt")
+ c.Assert(err, check.IsNil)
+ _, err = fmt.Fprintln(fStderr, "--end--")
+ c.Assert(err, check.IsNil)
+
+ for deadline := time.Now().Add(20 * time.Second); time.Now().Before(deadline) && !strings.Contains(stdout.String(), "--end--"); time.Sleep(time.Second / 10) {
+ }
+ c.Check(stdout.String(), check.Matches, `(?ms).*--end--\n.*`)
+
+ mtxt, err := cfs.MarshalManifest(".")
+ c.Assert(err, check.IsNil)
+ savedLog, err := rpcconn.CollectionCreate(ctx, arvados.CreateOptions{Attrs: map[string]interface{}{
+ "manifest_text": mtxt,
+ }})
+ c.Assert(err, check.IsNil)
+ _, err = rpcconn.ContainerUpdate(ctx, arvados.UpdateOptions{UUID: cr.ContainerUUID, Attrs: map[string]interface{}{
+ "state": arvados.ContainerStateComplete,
+ "log": savedLog.PortableDataHash,
+ "output": "d41d8cd98f00b204e9800998ecf8427e+0",
+ "exit_code": 0,
+ }})
+ c.Assert(err, check.IsNil)
+
+ err = cmd.Wait()
+ c.Check(err, check.IsNil)
+ // Ensure controller doesn't cheat by fetching data from the
+ // gateway after the container is complete.
+ gw.LogCollection = nil
+
+ c.Logf("re-running logs command on completed container")
+ {
+ ctx, cancel := context.WithDeadline(ctx, time.Now().Add(time.Second*5))
+ defer cancel()
+ cmd := exec.CommandContext(ctx, "go", "run", ".", "logs", cr.UUID)
+ cmd.Env = append(cmd.Env, os.Environ()...)
+ cmd.Env = append(cmd.Env, "ARVADOS_API_TOKEN="+arvadostest.SystemRootToken)
+ buf, err := cmd.CombinedOutput()
+ c.Check(err, check.Equals, nil)
+ c.Check(string(buf), check.Matches, `(?ms).*--end--\n`)
+ }
+}
diff --git a/lib/controller/rpc/conn.go b/lib/controller/rpc/conn.go
index 70a936a6f..9856eb576 100644
--- a/lib/controller/rpc/conn.go
+++ b/lib/controller/rpc/conn.go
@@ -340,6 +340,12 @@ func (conn *Conn) ContainerUnlock(ctx context.Context, options arvados.GetOption
}
func (conn *Conn) ContainerLog(ctx context.Context, options arvados.ContainerLogOptions) (resp http.Handler, err error) {
+ tokens, err := conn.tokenProvider(ctx)
+ if err != nil {
+ return nil, err
+ } else if len(tokens) < 1 {
+ return nil, httpserver.ErrorWithStatus(errors.New("unauthorized"), http.StatusUnauthorized)
+ }
proxy := &httputil.ReverseProxy{
Transport: conn.httpClient.Transport,
Director: func(r *http.Request) {
@@ -347,6 +353,7 @@ func (conn *Conn) ContainerLog(ctx context.Context, options arvados.ContainerLog
u.Path = r.URL.Path
u.RawQuery = fmt.Sprintf("no_forward=%v", options.NoForward)
r.URL = &u
+ r.Header.Set("Authorization", "Bearer "+tokens[0])
},
}
return proxy, nil
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list