[arvados] updated: 2.7.0-36-g1dd06b9c84

git repository hosting git at public.arvados.org
Wed Dec 6 16:59:16 UTC 2023


Summary of changes:
 build/run-tests.sh                       |  1 +
 lib/boot/supervisor.go                   | 23 ++++++++++-----
 lib/controller/localdb/container_test.go |  1 +
 sdk/go/arvados/client.go                 | 43 ++++++++++++++++++++++-----
 sdk/go/keepclient/keepclient.go          | 21 ++++++++++++++
 services/keep-balance/server.go          |  4 +--
 services/keepproxy/keepproxy.go          | 50 ++++++++++++++++++++------------
 services/keepstore/proxy_remote.go       |  4 +--
 services/keepstore/pull_worker.go        |  4 +--
 9 files changed, 112 insertions(+), 39 deletions(-)

       via  1dd06b9c849b4cf20b6f21e6004d5b11234cf348 (commit)
      from  318684addb994eae446bdc9907bd5d44c8f58e2a (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 1dd06b9c849b4cf20b6f21e6004d5b11234cf348
Author: Tom Clegg <tom at curii.com>
Date:   Wed Dec 6 11:55:52 2023 -0500

    Merge branch '21227-keep-web-panic'
    
    fixes #21227
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/build/run-tests.sh b/build/run-tests.sh
index 586b724b69..28051318b2 100755
--- a/build/run-tests.sh
+++ b/build/run-tests.sh
@@ -1000,6 +1000,7 @@ test_gofmt() {
     cd "$WORKSPACE" || return 1
     dirs=$(ls -d */ | egrep -v 'vendor|tmp')
     [[ -z "$(gofmt -e -d $dirs | tee -a /dev/stderr)" ]]
+    go vet -composites=false ./...
 }
 
 test_services/api() {
diff --git a/lib/boot/supervisor.go b/lib/boot/supervisor.go
index 55fe9d2e37..19649b755b 100644
--- a/lib/boot/supervisor.go
+++ b/lib/boot/supervisor.go
@@ -112,7 +112,7 @@ func (super *Supervisor) Start(ctx context.Context) {
 	super.ctx, super.cancel = context.WithCancel(ctx)
 	super.done = make(chan struct{})
 
-	sigch := make(chan os.Signal)
+	sigch := make(chan os.Signal, 1)
 	signal.Notify(sigch, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP)
 	go func() {
 		defer signal.Stop(sigch)
@@ -205,15 +205,24 @@ func (super *Supervisor) Wait() error {
 func (super *Supervisor) startFederation(cfg *arvados.Config) {
 	super.children = map[string]*Supervisor{}
 	for id, cc := range cfg.Clusters {
-		super2 := *super
 		yaml, err := json.Marshal(arvados.Config{Clusters: map[string]arvados.Cluster{id: cc}})
 		if err != nil {
 			panic(fmt.Sprintf("json.Marshal partial config: %s", err))
 		}
-		super2.ConfigYAML = string(yaml)
-		super2.ConfigPath = "-"
-		super2.children = nil
-
+		super2 := &Supervisor{
+			ConfigPath:           "-",
+			ConfigYAML:           string(yaml),
+			SourcePath:           super.SourcePath,
+			SourceVersion:        super.SourceVersion,
+			ClusterType:          super.ClusterType,
+			ListenHost:           super.ListenHost,
+			ControllerAddr:       super.ControllerAddr,
+			NoWorkbench1:         super.NoWorkbench1,
+			NoWorkbench2:         super.NoWorkbench2,
+			OwnTemporaryDatabase: super.OwnTemporaryDatabase,
+			Stdin:                super.Stdin,
+			Stderr:               super.Stderr,
+		}
 		if super2.ClusterType == "test" {
 			super2.Stderr = &service.LogPrefixer{
 				Writer: super.Stderr,
@@ -221,7 +230,7 @@ func (super *Supervisor) startFederation(cfg *arvados.Config) {
 			}
 		}
 		super2.Start(super.ctx)
-		super.children[id] = &super2
+		super.children[id] = super2
 	}
 }
 
diff --git a/lib/controller/localdb/container_test.go b/lib/controller/localdb/container_test.go
index 65d9fac5bb..0a74d52807 100644
--- a/lib/controller/localdb/container_test.go
+++ b/lib/controller/localdb/container_test.go
@@ -204,6 +204,7 @@ func (s *containerSuite) TestUpdatePriorityMultiLevelWorkflow(c *C) {
 	defer deadlockCancel()
 	for _, cr := range allcrs {
 		if strings.Contains(cr.Command[2], " j ") && !strings.Contains(cr.Command[2], " k ") {
+			cr := cr
 			wg.Add(1)
 			go func() {
 				defer wg.Done()
diff --git a/sdk/go/arvados/client.go b/sdk/go/arvados/client.go
index c2f6361334..e3c1432660 100644
--- a/sdk/go/arvados/client.go
+++ b/sdk/go/arvados/client.go
@@ -26,6 +26,7 @@ import (
 	"regexp"
 	"strconv"
 	"strings"
+	"sync"
 	"sync/atomic"
 	"time"
 
@@ -88,7 +89,10 @@ type Client struct {
 	// differs from an outgoing connection limit (a feature
 	// provided by http.Transport) when concurrent calls are
 	// multiplexed on a single http2 connection.
-	requestLimiter requestLimiter
+	//
+	// getRequestLimiter() should always be used, because this can
+	// be nil.
+	requestLimiter *requestLimiter
 
 	last503 atomic.Value
 }
@@ -150,7 +154,7 @@ func NewClientFromConfig(cluster *Cluster) (*Client, error) {
 		APIHost:        ctrlURL.Host,
 		Insecure:       cluster.TLS.Insecure,
 		Timeout:        5 * time.Minute,
-		requestLimiter: requestLimiter{maxlimit: int64(cluster.API.MaxConcurrentRequests / 4)},
+		requestLimiter: &requestLimiter{maxlimit: int64(cluster.API.MaxConcurrentRequests / 4)},
 	}, nil
 }
 
@@ -291,7 +295,7 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
 	}
 	rclient.CheckRetry = func(ctx context.Context, resp *http.Response, respErr error) (bool, error) {
 		checkRetryCalled++
-		if c.requestLimiter.Report(resp, respErr) {
+		if c.getRequestLimiter().Report(resp, respErr) {
 			c.last503.Store(time.Now())
 		}
 		if c.Timeout == 0 {
@@ -321,9 +325,10 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
 	}
 	rclient.Logger = nil
 
-	c.requestLimiter.Acquire(ctx)
+	limiter := c.getRequestLimiter()
+	limiter.Acquire(ctx)
 	if ctx.Err() != nil {
-		c.requestLimiter.Release()
+		limiter.Release()
 		cancel()
 		return nil, ctx.Err()
 	}
@@ -342,7 +347,7 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
 		}
 	}
 	if err != nil {
-		c.requestLimiter.Release()
+		limiter.Release()
 		cancel()
 		return nil, err
 	}
@@ -352,7 +357,7 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
 	resp.Body = cancelOnClose{
 		ReadCloser: resp.Body,
 		cancel: func() {
-			c.requestLimiter.Release()
+			limiter.Release()
 			cancel()
 		},
 	}
@@ -366,6 +371,30 @@ func (c *Client) Last503() time.Time {
 	return t
 }
 
+// globalRequestLimiter entries (one for each APIHost) don't have a
+// hard limit on outgoing connections, but do add a delay and reduce
+// concurrency after 503 errors.
+var (
+	globalRequestLimiter     = map[string]*requestLimiter{}
+	globalRequestLimiterLock sync.Mutex
+)
+
+// Get this client's requestLimiter, or a global requestLimiter
+// singleton for c's APIHost if this client doesn't have its own.
+func (c *Client) getRequestLimiter() *requestLimiter {
+	if c.requestLimiter != nil {
+		return c.requestLimiter
+	}
+	globalRequestLimiterLock.Lock()
+	defer globalRequestLimiterLock.Unlock()
+	limiter := globalRequestLimiter[c.APIHost]
+	if limiter == nil {
+		limiter = &requestLimiter{}
+		globalRequestLimiter[c.APIHost] = limiter
+	}
+	return limiter
+}
+
 // cancelOnClose calls a provided CancelFunc when its wrapped
 // ReadCloser's Close() method is called.
 type cancelOnClose struct {
diff --git a/sdk/go/keepclient/keepclient.go b/sdk/go/keepclient/keepclient.go
index 68ac886ddd..86001c01e0 100644
--- a/sdk/go/keepclient/keepclient.go
+++ b/sdk/go/keepclient/keepclient.go
@@ -120,6 +120,27 @@ type KeepClient struct {
 	disableDiscovery bool
 }
 
+func (kc *KeepClient) Clone() *KeepClient {
+	kc.lock.Lock()
+	defer kc.lock.Unlock()
+	return &KeepClient{
+		Arvados:               kc.Arvados,
+		Want_replicas:         kc.Want_replicas,
+		localRoots:            kc.localRoots,
+		writableLocalRoots:    kc.writableLocalRoots,
+		gatewayRoots:          kc.gatewayRoots,
+		HTTPClient:            kc.HTTPClient,
+		Retries:               kc.Retries,
+		BlockCache:            kc.BlockCache,
+		RequestID:             kc.RequestID,
+		StorageClasses:        kc.StorageClasses,
+		DefaultStorageClasses: kc.DefaultStorageClasses,
+		replicasPerService:    kc.replicasPerService,
+		foundNonDiskSvc:       kc.foundNonDiskSvc,
+		disableDiscovery:      kc.disableDiscovery,
+	}
+}
+
 func (kc *KeepClient) loadDefaultClasses() error {
 	scData, err := kc.Arvados.ClusterConfig("StorageClasses")
 	if err != nil {
diff --git a/services/keep-balance/server.go b/services/keep-balance/server.go
index b20144e3af..480791ffa2 100644
--- a/services/keep-balance/server.go
+++ b/services/keep-balance/server.go
@@ -98,9 +98,7 @@ func (srv *Server) runForever(ctx context.Context) error {
 
 	ticker := time.NewTicker(time.Duration(srv.Cluster.Collections.BalancePeriod))
 
-	// The unbuffered channel here means we only hear SIGUSR1 if
-	// it arrives while we're waiting in select{}.
-	sigUSR1 := make(chan os.Signal)
+	sigUSR1 := make(chan os.Signal, 1)
 	signal.Notify(sigUSR1, syscall.SIGUSR1)
 
 	logger.Info("acquiring service lock")
diff --git a/services/keepproxy/keepproxy.go b/services/keepproxy/keepproxy.go
index 2090c50686..a79883147b 100644
--- a/services/keepproxy/keepproxy.go
+++ b/services/keepproxy/keepproxy.go
@@ -175,13 +175,18 @@ func (h *proxyHandler) checkAuthorizationHeader(req *http.Request) (pass bool, t
 	return true, tok, user
 }
 
-// We need to make a private copy of the default http transport early
-// in initialization, then make copies of our private copy later. It
-// won't be safe to copy http.DefaultTransport itself later, because
-// its private mutexes might have already been used. (Without this,
-// the test suite sometimes panics "concurrent map writes" in
-// net/http.(*Transport).removeIdleConnLocked().)
-var defaultTransport = *(http.DefaultTransport.(*http.Transport))
+// We can't copy the default http transport because http.Transport has
+// a mutex field, so we make our own using the values of the exported
+// fields.
+var defaultTransport = http.Transport{
+	Proxy:                 http.DefaultTransport.(*http.Transport).Proxy,
+	DialContext:           http.DefaultTransport.(*http.Transport).DialContext,
+	ForceAttemptHTTP2:     http.DefaultTransport.(*http.Transport).ForceAttemptHTTP2,
+	MaxIdleConns:          http.DefaultTransport.(*http.Transport).MaxIdleConns,
+	IdleConnTimeout:       http.DefaultTransport.(*http.Transport).IdleConnTimeout,
+	TLSHandshakeTimeout:   http.DefaultTransport.(*http.Transport).TLSHandshakeTimeout,
+	ExpectContinueTimeout: http.DefaultTransport.(*http.Transport).ExpectContinueTimeout,
+}
 
 type proxyHandler struct {
 	http.Handler
@@ -195,14 +200,23 @@ type proxyHandler struct {
 func newHandler(ctx context.Context, kc *keepclient.KeepClient, timeout time.Duration, cluster *arvados.Cluster) (service.Handler, error) {
 	rest := mux.NewRouter()
 
-	transport := defaultTransport
-	transport.DialContext = (&net.Dialer{
-		Timeout:   keepclient.DefaultConnectTimeout,
-		KeepAlive: keepclient.DefaultKeepAlive,
-		DualStack: true,
-	}).DialContext
-	transport.TLSClientConfig = arvadosclient.MakeTLSConfig(kc.Arvados.ApiInsecure)
-	transport.TLSHandshakeTimeout = keepclient.DefaultTLSHandshakeTimeout
+	// We can't copy the default http transport because
+	// http.Transport has a mutex field, so we copy the fields
+	// that we know have non-zero values in http.DefaultTransport.
+	transport := &http.Transport{
+		Proxy:                 http.DefaultTransport.(*http.Transport).Proxy,
+		ForceAttemptHTTP2:     http.DefaultTransport.(*http.Transport).ForceAttemptHTTP2,
+		MaxIdleConns:          http.DefaultTransport.(*http.Transport).MaxIdleConns,
+		IdleConnTimeout:       http.DefaultTransport.(*http.Transport).IdleConnTimeout,
+		ExpectContinueTimeout: http.DefaultTransport.(*http.Transport).ExpectContinueTimeout,
+		DialContext: (&net.Dialer{
+			Timeout:   keepclient.DefaultConnectTimeout,
+			KeepAlive: keepclient.DefaultKeepAlive,
+			DualStack: true,
+		}).DialContext,
+		TLSClientConfig:     arvadosclient.MakeTLSConfig(kc.Arvados.ApiInsecure),
+		TLSHandshakeTimeout: keepclient.DefaultTLSHandshakeTimeout,
+	}
 
 	cacheQ, err := lru.New2Q(500)
 	if err != nil {
@@ -213,7 +227,7 @@ func newHandler(ctx context.Context, kc *keepclient.KeepClient, timeout time.Dur
 		Handler:    rest,
 		KeepClient: kc,
 		timeout:    timeout,
-		transport:  &transport,
+		transport:  transport,
 		apiTokenCache: &apiTokenCache{
 			tokens:     cacheQ,
 			expireTime: 300,
@@ -566,7 +580,7 @@ func (h *proxyHandler) Index(resp http.ResponseWriter, req *http.Request) {
 }
 
 func (h *proxyHandler) makeKeepClient(req *http.Request) *keepclient.KeepClient {
-	kc := *h.KeepClient
+	kc := h.KeepClient.Clone()
 	kc.RequestID = req.Header.Get("X-Request-Id")
 	kc.HTTPClient = &proxyClient{
 		client: &http.Client{
@@ -575,5 +589,5 @@ func (h *proxyHandler) makeKeepClient(req *http.Request) *keepclient.KeepClient
 		},
 		proto: req.Proto,
 	}
-	return &kc
+	return kc
 }
diff --git a/services/keepstore/proxy_remote.go b/services/keepstore/proxy_remote.go
index 526bc25299..66a7b43751 100644
--- a/services/keepstore/proxy_remote.go
+++ b/services/keepstore/proxy_remote.go
@@ -130,14 +130,14 @@ func (rp *remoteProxy) remoteClient(remoteID string, remoteCluster arvados.Remot
 	}
 	accopy := *kc.Arvados
 	accopy.ApiToken = token
-	kccopy := *kc
+	kccopy := kc.Clone()
 	kccopy.Arvados = &accopy
 	token, err := auth.SaltToken(token, remoteID)
 	if err != nil {
 		return nil, err
 	}
 	kccopy.Arvados.ApiToken = token
-	return &kccopy, nil
+	return kccopy, nil
 }
 
 var localOrRemoteSignature = regexp.MustCompile(`\+[AR][^\+]*`)
diff --git a/services/keepstore/pull_worker.go b/services/keepstore/pull_worker.go
index abe3dc3857..b9194fe6f6 100644
--- a/services/keepstore/pull_worker.go
+++ b/services/keepstore/pull_worker.go
@@ -50,7 +50,7 @@ func (h *handler) pullItemAndProcess(pullRequest PullRequest) error {
 	// Make a private copy of keepClient so we can set
 	// ServiceRoots to the source servers specified in the pull
 	// request.
-	keepClient := *h.keepClient
+	keepClient := h.keepClient.Clone()
 	serviceRoots := make(map[string]string)
 	for _, addr := range pullRequest.Servers {
 		serviceRoots[addr] = addr
@@ -59,7 +59,7 @@ func (h *handler) pullItemAndProcess(pullRequest PullRequest) error {
 
 	signedLocator := SignLocator(h.Cluster, pullRequest.Locator, keepClient.Arvados.ApiToken, time.Now().Add(time.Minute))
 
-	reader, contentLen, _, err := GetContent(signedLocator, &keepClient)
+	reader, contentLen, _, err := GetContent(signedLocator, keepClient)
 	if err != nil {
 		return err
 	}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list