[arvados] updated: 2.7.0-36-g1dd06b9c84
git repository hosting
git at public.arvados.org
Wed Dec 6 16:59:16 UTC 2023
Summary of changes:
build/run-tests.sh | 1 +
lib/boot/supervisor.go | 23 ++++++++++-----
lib/controller/localdb/container_test.go | 1 +
sdk/go/arvados/client.go | 43 ++++++++++++++++++++++-----
sdk/go/keepclient/keepclient.go | 21 ++++++++++++++
services/keep-balance/server.go | 4 +--
services/keepproxy/keepproxy.go | 50 ++++++++++++++++++++------------
services/keepstore/proxy_remote.go | 4 +--
services/keepstore/pull_worker.go | 4 +--
9 files changed, 112 insertions(+), 39 deletions(-)
via 1dd06b9c849b4cf20b6f21e6004d5b11234cf348 (commit)
from 318684addb994eae446bdc9907bd5d44c8f58e2a (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 1dd06b9c849b4cf20b6f21e6004d5b11234cf348
Author: Tom Clegg <tom at curii.com>
Date: Wed Dec 6 11:55:52 2023 -0500
Merge branch '21227-keep-web-panic'
fixes #21227
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/build/run-tests.sh b/build/run-tests.sh
index 586b724b69..28051318b2 100755
--- a/build/run-tests.sh
+++ b/build/run-tests.sh
@@ -1000,6 +1000,7 @@ test_gofmt() {
cd "$WORKSPACE" || return 1
dirs=$(ls -d */ | egrep -v 'vendor|tmp')
[[ -z "$(gofmt -e -d $dirs | tee -a /dev/stderr)" ]]
+ go vet -composites=false ./...
}
test_services/api() {
diff --git a/lib/boot/supervisor.go b/lib/boot/supervisor.go
index 55fe9d2e37..19649b755b 100644
--- a/lib/boot/supervisor.go
+++ b/lib/boot/supervisor.go
@@ -112,7 +112,7 @@ func (super *Supervisor) Start(ctx context.Context) {
super.ctx, super.cancel = context.WithCancel(ctx)
super.done = make(chan struct{})
- sigch := make(chan os.Signal)
+ sigch := make(chan os.Signal, 1)
signal.Notify(sigch, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP)
go func() {
defer signal.Stop(sigch)
@@ -205,15 +205,24 @@ func (super *Supervisor) Wait() error {
func (super *Supervisor) startFederation(cfg *arvados.Config) {
super.children = map[string]*Supervisor{}
for id, cc := range cfg.Clusters {
- super2 := *super
yaml, err := json.Marshal(arvados.Config{Clusters: map[string]arvados.Cluster{id: cc}})
if err != nil {
panic(fmt.Sprintf("json.Marshal partial config: %s", err))
}
- super2.ConfigYAML = string(yaml)
- super2.ConfigPath = "-"
- super2.children = nil
-
+ super2 := &Supervisor{
+ ConfigPath: "-",
+ ConfigYAML: string(yaml),
+ SourcePath: super.SourcePath,
+ SourceVersion: super.SourceVersion,
+ ClusterType: super.ClusterType,
+ ListenHost: super.ListenHost,
+ ControllerAddr: super.ControllerAddr,
+ NoWorkbench1: super.NoWorkbench1,
+ NoWorkbench2: super.NoWorkbench2,
+ OwnTemporaryDatabase: super.OwnTemporaryDatabase,
+ Stdin: super.Stdin,
+ Stderr: super.Stderr,
+ }
if super2.ClusterType == "test" {
super2.Stderr = &service.LogPrefixer{
Writer: super.Stderr,
@@ -221,7 +230,7 @@ func (super *Supervisor) startFederation(cfg *arvados.Config) {
}
}
super2.Start(super.ctx)
- super.children[id] = &super2
+ super.children[id] = super2
}
}
diff --git a/lib/controller/localdb/container_test.go b/lib/controller/localdb/container_test.go
index 65d9fac5bb..0a74d52807 100644
--- a/lib/controller/localdb/container_test.go
+++ b/lib/controller/localdb/container_test.go
@@ -204,6 +204,7 @@ func (s *containerSuite) TestUpdatePriorityMultiLevelWorkflow(c *C) {
defer deadlockCancel()
for _, cr := range allcrs {
if strings.Contains(cr.Command[2], " j ") && !strings.Contains(cr.Command[2], " k ") {
+ cr := cr
wg.Add(1)
go func() {
defer wg.Done()
diff --git a/sdk/go/arvados/client.go b/sdk/go/arvados/client.go
index c2f6361334..e3c1432660 100644
--- a/sdk/go/arvados/client.go
+++ b/sdk/go/arvados/client.go
@@ -26,6 +26,7 @@ import (
"regexp"
"strconv"
"strings"
+ "sync"
"sync/atomic"
"time"
@@ -88,7 +89,10 @@ type Client struct {
// differs from an outgoing connection limit (a feature
// provided by http.Transport) when concurrent calls are
// multiplexed on a single http2 connection.
- requestLimiter requestLimiter
+ //
+ // getRequestLimiter() should always be used, because this can
+ // be nil.
+ requestLimiter *requestLimiter
last503 atomic.Value
}
@@ -150,7 +154,7 @@ func NewClientFromConfig(cluster *Cluster) (*Client, error) {
APIHost: ctrlURL.Host,
Insecure: cluster.TLS.Insecure,
Timeout: 5 * time.Minute,
- requestLimiter: requestLimiter{maxlimit: int64(cluster.API.MaxConcurrentRequests / 4)},
+ requestLimiter: &requestLimiter{maxlimit: int64(cluster.API.MaxConcurrentRequests / 4)},
}, nil
}
@@ -291,7 +295,7 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
}
rclient.CheckRetry = func(ctx context.Context, resp *http.Response, respErr error) (bool, error) {
checkRetryCalled++
- if c.requestLimiter.Report(resp, respErr) {
+ if c.getRequestLimiter().Report(resp, respErr) {
c.last503.Store(time.Now())
}
if c.Timeout == 0 {
@@ -321,9 +325,10 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
}
rclient.Logger = nil
- c.requestLimiter.Acquire(ctx)
+ limiter := c.getRequestLimiter()
+ limiter.Acquire(ctx)
if ctx.Err() != nil {
- c.requestLimiter.Release()
+ limiter.Release()
cancel()
return nil, ctx.Err()
}
@@ -342,7 +347,7 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
}
}
if err != nil {
- c.requestLimiter.Release()
+ limiter.Release()
cancel()
return nil, err
}
@@ -352,7 +357,7 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
resp.Body = cancelOnClose{
ReadCloser: resp.Body,
cancel: func() {
- c.requestLimiter.Release()
+ limiter.Release()
cancel()
},
}
@@ -366,6 +371,30 @@ func (c *Client) Last503() time.Time {
return t
}
+// globalRequestLimiter entries (one for each APIHost) don't have a
+// hard limit on outgoing connections, but do add a delay and reduce
+// concurrency after 503 errors.
+var (
+ globalRequestLimiter = map[string]*requestLimiter{}
+ globalRequestLimiterLock sync.Mutex
+)
+
+// Get this client's requestLimiter, or a global requestLimiter
+// singleton for c's APIHost if this client doesn't have its own.
+func (c *Client) getRequestLimiter() *requestLimiter {
+ if c.requestLimiter != nil {
+ return c.requestLimiter
+ }
+ globalRequestLimiterLock.Lock()
+ defer globalRequestLimiterLock.Unlock()
+ limiter := globalRequestLimiter[c.APIHost]
+ if limiter == nil {
+ limiter = &requestLimiter{}
+ globalRequestLimiter[c.APIHost] = limiter
+ }
+ return limiter
+}
+
// cancelOnClose calls a provided CancelFunc when its wrapped
// ReadCloser's Close() method is called.
type cancelOnClose struct {
diff --git a/sdk/go/keepclient/keepclient.go b/sdk/go/keepclient/keepclient.go
index 68ac886ddd..86001c01e0 100644
--- a/sdk/go/keepclient/keepclient.go
+++ b/sdk/go/keepclient/keepclient.go
@@ -120,6 +120,27 @@ type KeepClient struct {
disableDiscovery bool
}
+func (kc *KeepClient) Clone() *KeepClient {
+ kc.lock.Lock()
+ defer kc.lock.Unlock()
+ return &KeepClient{
+ Arvados: kc.Arvados,
+ Want_replicas: kc.Want_replicas,
+ localRoots: kc.localRoots,
+ writableLocalRoots: kc.writableLocalRoots,
+ gatewayRoots: kc.gatewayRoots,
+ HTTPClient: kc.HTTPClient,
+ Retries: kc.Retries,
+ BlockCache: kc.BlockCache,
+ RequestID: kc.RequestID,
+ StorageClasses: kc.StorageClasses,
+ DefaultStorageClasses: kc.DefaultStorageClasses,
+ replicasPerService: kc.replicasPerService,
+ foundNonDiskSvc: kc.foundNonDiskSvc,
+ disableDiscovery: kc.disableDiscovery,
+ }
+}
+
func (kc *KeepClient) loadDefaultClasses() error {
scData, err := kc.Arvados.ClusterConfig("StorageClasses")
if err != nil {
diff --git a/services/keep-balance/server.go b/services/keep-balance/server.go
index b20144e3af..480791ffa2 100644
--- a/services/keep-balance/server.go
+++ b/services/keep-balance/server.go
@@ -98,9 +98,7 @@ func (srv *Server) runForever(ctx context.Context) error {
ticker := time.NewTicker(time.Duration(srv.Cluster.Collections.BalancePeriod))
- // The unbuffered channel here means we only hear SIGUSR1 if
- // it arrives while we're waiting in select{}.
- sigUSR1 := make(chan os.Signal)
+ sigUSR1 := make(chan os.Signal, 1)
signal.Notify(sigUSR1, syscall.SIGUSR1)
logger.Info("acquiring service lock")
diff --git a/services/keepproxy/keepproxy.go b/services/keepproxy/keepproxy.go
index 2090c50686..a79883147b 100644
--- a/services/keepproxy/keepproxy.go
+++ b/services/keepproxy/keepproxy.go
@@ -175,13 +175,18 @@ func (h *proxyHandler) checkAuthorizationHeader(req *http.Request) (pass bool, t
return true, tok, user
}
-// We need to make a private copy of the default http transport early
-// in initialization, then make copies of our private copy later. It
-// won't be safe to copy http.DefaultTransport itself later, because
-// its private mutexes might have already been used. (Without this,
-// the test suite sometimes panics "concurrent map writes" in
-// net/http.(*Transport).removeIdleConnLocked().)
-var defaultTransport = *(http.DefaultTransport.(*http.Transport))
+// We can't copy the default http transport because http.Transport has
+// a mutex field, so we make our own using the values of the exported
+// fields.
+var defaultTransport = http.Transport{
+ Proxy: http.DefaultTransport.(*http.Transport).Proxy,
+ DialContext: http.DefaultTransport.(*http.Transport).DialContext,
+ ForceAttemptHTTP2: http.DefaultTransport.(*http.Transport).ForceAttemptHTTP2,
+ MaxIdleConns: http.DefaultTransport.(*http.Transport).MaxIdleConns,
+ IdleConnTimeout: http.DefaultTransport.(*http.Transport).IdleConnTimeout,
+ TLSHandshakeTimeout: http.DefaultTransport.(*http.Transport).TLSHandshakeTimeout,
+ ExpectContinueTimeout: http.DefaultTransport.(*http.Transport).ExpectContinueTimeout,
+}
type proxyHandler struct {
http.Handler
@@ -195,14 +200,23 @@ type proxyHandler struct {
func newHandler(ctx context.Context, kc *keepclient.KeepClient, timeout time.Duration, cluster *arvados.Cluster) (service.Handler, error) {
rest := mux.NewRouter()
- transport := defaultTransport
- transport.DialContext = (&net.Dialer{
- Timeout: keepclient.DefaultConnectTimeout,
- KeepAlive: keepclient.DefaultKeepAlive,
- DualStack: true,
- }).DialContext
- transport.TLSClientConfig = arvadosclient.MakeTLSConfig(kc.Arvados.ApiInsecure)
- transport.TLSHandshakeTimeout = keepclient.DefaultTLSHandshakeTimeout
+ // We can't copy the default http transport because
+ // http.Transport has a mutex field, so we copy the fields
+ // that we know have non-zero values in http.DefaultTransport.
+ transport := &http.Transport{
+ Proxy: http.DefaultTransport.(*http.Transport).Proxy,
+ ForceAttemptHTTP2: http.DefaultTransport.(*http.Transport).ForceAttemptHTTP2,
+ MaxIdleConns: http.DefaultTransport.(*http.Transport).MaxIdleConns,
+ IdleConnTimeout: http.DefaultTransport.(*http.Transport).IdleConnTimeout,
+ ExpectContinueTimeout: http.DefaultTransport.(*http.Transport).ExpectContinueTimeout,
+ DialContext: (&net.Dialer{
+ Timeout: keepclient.DefaultConnectTimeout,
+ KeepAlive: keepclient.DefaultKeepAlive,
+ DualStack: true,
+ }).DialContext,
+ TLSClientConfig: arvadosclient.MakeTLSConfig(kc.Arvados.ApiInsecure),
+ TLSHandshakeTimeout: keepclient.DefaultTLSHandshakeTimeout,
+ }
cacheQ, err := lru.New2Q(500)
if err != nil {
@@ -213,7 +227,7 @@ func newHandler(ctx context.Context, kc *keepclient.KeepClient, timeout time.Dur
Handler: rest,
KeepClient: kc,
timeout: timeout,
- transport: &transport,
+ transport: transport,
apiTokenCache: &apiTokenCache{
tokens: cacheQ,
expireTime: 300,
@@ -566,7 +580,7 @@ func (h *proxyHandler) Index(resp http.ResponseWriter, req *http.Request) {
}
func (h *proxyHandler) makeKeepClient(req *http.Request) *keepclient.KeepClient {
- kc := *h.KeepClient
+ kc := h.KeepClient.Clone()
kc.RequestID = req.Header.Get("X-Request-Id")
kc.HTTPClient = &proxyClient{
client: &http.Client{
@@ -575,5 +589,5 @@ func (h *proxyHandler) makeKeepClient(req *http.Request) *keepclient.KeepClient
},
proto: req.Proto,
}
- return &kc
+ return kc
}
diff --git a/services/keepstore/proxy_remote.go b/services/keepstore/proxy_remote.go
index 526bc25299..66a7b43751 100644
--- a/services/keepstore/proxy_remote.go
+++ b/services/keepstore/proxy_remote.go
@@ -130,14 +130,14 @@ func (rp *remoteProxy) remoteClient(remoteID string, remoteCluster arvados.Remot
}
accopy := *kc.Arvados
accopy.ApiToken = token
- kccopy := *kc
+ kccopy := kc.Clone()
kccopy.Arvados = &accopy
token, err := auth.SaltToken(token, remoteID)
if err != nil {
return nil, err
}
kccopy.Arvados.ApiToken = token
- return &kccopy, nil
+ return kccopy, nil
}
var localOrRemoteSignature = regexp.MustCompile(`\+[AR][^\+]*`)
diff --git a/services/keepstore/pull_worker.go b/services/keepstore/pull_worker.go
index abe3dc3857..b9194fe6f6 100644
--- a/services/keepstore/pull_worker.go
+++ b/services/keepstore/pull_worker.go
@@ -50,7 +50,7 @@ func (h *handler) pullItemAndProcess(pullRequest PullRequest) error {
// Make a private copy of keepClient so we can set
// ServiceRoots to the source servers specified in the pull
// request.
- keepClient := *h.keepClient
+ keepClient := h.keepClient.Clone()
serviceRoots := make(map[string]string)
for _, addr := range pullRequest.Servers {
serviceRoots[addr] = addr
@@ -59,7 +59,7 @@ func (h *handler) pullItemAndProcess(pullRequest PullRequest) error {
signedLocator := SignLocator(h.Cluster, pullRequest.Locator, keepClient.Arvados.ApiToken, time.Now().Add(time.Minute))
- reader, contentLen, _, err := GetContent(signedLocator, &keepClient)
+ reader, contentLen, _, err := GetContent(signedLocator, keepClient)
if err != nil {
return err
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list