[arvados] created: 2.5.0-110-g35e9b21d8

git repository hosting git at public.arvados.org
Fri Feb 10 15:37:48 UTC 2023


        at  35e9b21d80569fec8860596213c72d199f79a593 (commit)


commit 35e9b21d80569fec8860596213c72d199f79a593
Author: Tom Clegg <tom at curii.com>
Date:   Thu Feb 9 11:11:22 2023 -0500

    19973: Limit container concurrency when API is returning 503.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/lib/dispatchcloud/dispatcher.go b/lib/dispatchcloud/dispatcher.go
index 3403c50c9..270c6d43d 100644
--- a/lib/dispatchcloud/dispatcher.go
+++ b/lib/dispatchcloud/dispatcher.go
@@ -193,7 +193,7 @@ func (disp *dispatcher) run() {
 	if pollInterval <= 0 {
 		pollInterval = defaultPollInterval
 	}
-	sched := scheduler.New(disp.Context, disp.queue, disp.pool, disp.Registry, staleLockTimeout, pollInterval)
+	sched := scheduler.New(disp.Context, disp.ArvClient, disp.queue, disp.pool, disp.Registry, staleLockTimeout, pollInterval)
 	sched.Start()
 	defer sched.Stop()
 
diff --git a/lib/dispatchcloud/scheduler/run_queue.go b/lib/dispatchcloud/scheduler/run_queue.go
index f729f0dc2..cfd95e945 100644
--- a/lib/dispatchcloud/scheduler/run_queue.go
+++ b/lib/dispatchcloud/scheduler/run_queue.go
@@ -13,6 +13,8 @@ import (
 	"github.com/sirupsen/logrus"
 )
 
+var quietAfter503 = time.Minute
+
 func (sch *Scheduler) runQueue() {
 	unsorted, _ := sch.queue.Entries()
 	sorted := make([]container.QueueEnt, 0, len(unsorted))
@@ -35,15 +37,47 @@ func (sch *Scheduler) runQueue() {
 	running := sch.pool.Running()
 	unalloc := sch.pool.Unallocated()
 
+	if t := sch.client.Last503(); t.After(sch.last503time) {
+		// API has sent an HTTP 503 response since last time
+		// we checked. Use current #containers - 1 as
+		// maxConcurrency, i.e., try to stay just below the
+		// level where we see 503s.
+		sch.last503time = t
+		if newlimit := len(running) - 1; newlimit < 1 {
+			sch.maxConcurrency = 1
+		} else {
+			sch.maxConcurrency = newlimit
+		}
+	} else if sch.maxConcurrency > 0 && time.Since(sch.last503time) > quietAfter503 {
+		// If we haven't seen any 503 errors lately, raise
+		// limit to ~10% beyond the current workload.
+		//
+		// As we use the added 10% to schedule more
+		// containers, len(running) will increase and we'll
+		// push the limit up further. Soon enough,
+		// maxConcurrency will get high enough to schedule the
+		// entire queue, hit pool quota, or get 503s again.
+		max := len(running)*11/10 + 1
+		if sch.maxConcurrency < max {
+			sch.maxConcurrency = max
+		}
+	}
+
 	sch.logger.WithFields(logrus.Fields{
-		"Containers": len(sorted),
-		"Processes":  len(running),
+		"Containers":     len(sorted),
+		"Processes":      len(running),
+		"maxConcurrency": sch.maxConcurrency,
 	}).Debug("runQueue")
 
 	dontstart := map[arvados.InstanceType]bool{}
 	var overquota []container.QueueEnt // entries that are unmappable because of worker pool quota
 	var containerAllocatedWorkerBootingCount int
 
+	// trying is #containers running + #containers we're trying to
+	// start. We stop trying to start more containers if this
+	// reaches the dynamic maxConcurrency limit.
+	trying := len(running)
+
 tryrun:
 	for i, ctr := range sorted {
 		ctr, it := ctr.Container, ctr.InstanceType
@@ -56,8 +90,14 @@ tryrun:
 		}
 		switch ctr.State {
 		case arvados.ContainerStateQueued:
+			if sch.maxConcurrency > 0 && trying >= sch.maxConcurrency {
+				logger.Tracef("not locking: already at maxConcurrency %d", sch.maxConcurrency)
+				overquota = sorted[i:]
+				break tryrun
+			}
+			trying++
 			if unalloc[it] < 1 && sch.pool.AtQuota() {
-				logger.Debug("not locking: AtQuota and no unalloc workers")
+				logger.Trace("not locking: AtQuota and no unalloc workers")
 				overquota = sorted[i:]
 				break tryrun
 			}
@@ -68,6 +108,12 @@ tryrun:
 			go sch.lockContainer(logger, ctr.UUID)
 			unalloc[it]--
 		case arvados.ContainerStateLocked:
+			if sch.maxConcurrency > 0 && trying >= sch.maxConcurrency {
+				logger.Debugf("not starting: already at maxConcurrency %d", sch.maxConcurrency)
+				overquota = sorted[i:]
+				break tryrun
+			}
+			trying++
 			if unalloc[it] > 0 {
 				unalloc[it]--
 			} else if sch.pool.AtQuota() {
@@ -115,10 +161,13 @@ tryrun:
 
 	if len(overquota) > 0 {
 		// Unlock any containers that are unmappable while
-		// we're at quota.
+		// we're at quota (but if they have already been
+		// scheduled and they're loading docker images etc.,
+		// let them run).
 		for _, ctr := range overquota {
 			ctr := ctr.Container
-			if ctr.State == arvados.ContainerStateLocked {
+			_, toolate := running[ctr.UUID]
+			if ctr.State == arvados.ContainerStateLocked && !toolate {
 				logger := sch.logger.WithField("ContainerUUID", ctr.UUID)
 				logger.Debug("unlock because pool capacity is used by higher priority containers")
 				err := sch.queue.Unlock(ctr.UUID)
diff --git a/lib/dispatchcloud/scheduler/scheduler.go b/lib/dispatchcloud/scheduler/scheduler.go
index c3e67dd11..589aa3ec1 100644
--- a/lib/dispatchcloud/scheduler/scheduler.go
+++ b/lib/dispatchcloud/scheduler/scheduler.go
@@ -31,6 +31,7 @@ import (
 // shuts down idle workers, in case they are consuming quota.
 type Scheduler struct {
 	logger              logrus.FieldLogger
+	client              *arvados.Client
 	queue               ContainerQueue
 	pool                WorkerPool
 	reg                 *prometheus.Registry
@@ -45,6 +46,9 @@ type Scheduler struct {
 	stop    chan struct{}
 	stopped chan struct{}
 
+	last503time    time.Time // last time API responded 503
+	maxConcurrency int       // dynamic container limit (0 = unlimited), see runQueue()
+
 	mContainersAllocatedNotStarted   prometheus.Gauge
 	mContainersNotAllocatedOverQuota prometheus.Gauge
 	mLongestWaitTimeSinceQueue       prometheus.Gauge
@@ -54,9 +58,10 @@ type Scheduler struct {
 //
 // Any given queue and pool should not be used by more than one
 // scheduler at a time.
-func New(ctx context.Context, queue ContainerQueue, pool WorkerPool, reg *prometheus.Registry, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler {
+func New(ctx context.Context, client *arvados.Client, queue ContainerQueue, pool WorkerPool, reg *prometheus.Registry, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler {
 	sch := &Scheduler{
 		logger:              ctxlog.FromContext(ctx),
+		client:              client,
 		queue:               queue,
 		pool:                pool,
 		reg:                 reg,
diff --git a/sdk/go/arvados/client.go b/sdk/go/arvados/client.go
index b400a8474..5ace3b9d8 100644
--- a/sdk/go/arvados/client.go
+++ b/sdk/go/arvados/client.go
@@ -23,6 +23,7 @@ import (
 	"os"
 	"regexp"
 	"strings"
+	"sync/atomic"
 	"time"
 
 	"git.arvados.org/arvados.git/sdk/go/httpserver"
@@ -82,6 +83,8 @@ type Client struct {
 	// provided by http.Transport) when concurrent calls are
 	// multiplexed on a single http2 connection.
 	requestLimiter
+
+	last503 atomic.Value
 }
 
 // InsecureHTTPClient is the default http.Client used by a Client with
@@ -275,6 +278,9 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
 
 	resp, err := c.httpClient().Do(req)
 	c.requestLimiter.Report(resp, err)
+	if err == nil && resp.StatusCode == http.StatusServiceUnavailable {
+		c.last503.Store(time.Now())
+	}
 	if err == nil {
 		// We need to call cancel() eventually, but we can't
 		// use "defer cancel()" because the context has to
@@ -287,6 +293,13 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
 	return resp, err
 }
 
+// Last503 returns the time of the most recent HTTP 503 (Service
+// Unavailable) response. Zero time indicates never.
+func (c *Client) Last503() time.Time {
+	t, _ := c.last503.Load().(time.Time)
+	return t
+}
+
 // cancelOnClose calls a provided CancelFunc when its wrapped
 // ReadCloser's Close() method is called.
 type cancelOnClose struct {

commit 69dcc65ee6b5832f7671c68051dd792981369da0
Author: Tom Clegg <tom at curii.com>
Date:   Wed Feb 8 17:04:25 2023 -0500

    19973: Limit arvados.Client request concurrency.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/sdk/go/arvados/client.go b/sdk/go/arvados/client.go
index 5a498b01f..b400a8474 100644
--- a/sdk/go/arvados/client.go
+++ b/sdk/go/arvados/client.go
@@ -76,6 +76,12 @@ type Client struct {
 	// APIHost and AuthToken were loaded from ARVADOS_* env vars
 	// (used to customize "no host/token" error messages)
 	loadedFromEnv bool
+
+	// Track/limit concurrent outgoing API calls. Note this
+	// differs from an outgoing connection limit (a feature
+	// provided by http.Transport) when concurrent calls are
+	// multiplexed on a single http2 connection.
+	requestLimiter
 }
 
 // InsecureHTTPClient is the default http.Client used by a Client with
@@ -220,10 +226,12 @@ func NewClientFromEnv() *Client {
 
 var reqIDGen = httpserver.IDGenerator{Prefix: "req-"}
 
-// Do adds Authorization and X-Request-Id headers and then calls
+// Do adds Authorization and X-Request-Id headers, delays in order to
+// comply with rate-limiting restrictions, and then calls
 // (*http.Client)Do().
 func (c *Client) Do(req *http.Request) (*http.Response, error) {
-	if auth, _ := req.Context().Value(contextKeyAuthorization{}).(string); auth != "" {
+	ctx := req.Context()
+	if auth, _ := ctx.Value(contextKeyAuthorization{}).(string); auth != "" {
 		req.Header.Add("Authorization", auth)
 	} else if c.AuthToken != "" {
 		req.Header.Add("Authorization", "OAuth2 "+c.AuthToken)
@@ -231,7 +239,7 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
 
 	if req.Header.Get("X-Request-Id") == "" {
 		var reqid string
-		if ctxreqid, _ := req.Context().Value(contextKeyRequestID{}).(string); ctxreqid != "" {
+		if ctxreqid, _ := ctx.Value(contextKeyRequestID{}).(string); ctxreqid != "" {
 			reqid = ctxreqid
 		} else if c.defaultRequestID != "" {
 			reqid = c.defaultRequestID
@@ -246,18 +254,34 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
 	}
 	var cancel context.CancelFunc
 	if c.Timeout > 0 {
-		ctx := req.Context()
 		ctx, cancel = context.WithDeadline(ctx, time.Now().Add(c.Timeout))
 		req = req.WithContext(ctx)
+	} else {
+		cancel = context.CancelFunc(func() {})
+	}
+
+	c.requestLimiter.Acquire(ctx)
+	if ctx.Err() != nil {
+		c.requestLimiter.Release()
+		return nil, ctx.Err()
 	}
+
+	// Attach Release() to cancel func, see cancelOnClose below.
+	cancelOrig := cancel
+	cancel = func() {
+		c.requestLimiter.Release()
+		cancelOrig()
+	}
+
 	resp, err := c.httpClient().Do(req)
-	if err == nil && cancel != nil {
+	c.requestLimiter.Report(resp, err)
+	if err == nil {
 		// We need to call cancel() eventually, but we can't
 		// use "defer cancel()" because the context has to
 		// stay alive until the caller has finished reading
 		// the response body.
 		resp.Body = cancelOnClose{ReadCloser: resp.Body, cancel: cancel}
-	} else if cancel != nil {
+	} else {
 		cancel()
 	}
 	return resp, err
diff --git a/sdk/go/arvados/limiter.go b/sdk/go/arvados/limiter.go
new file mode 100644
index 000000000..ecbe0b180
--- /dev/null
+++ b/sdk/go/arvados/limiter.go
@@ -0,0 +1,130 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package arvados
+
+import (
+	"context"
+	"net/http"
+	"sync"
+	"time"
+)
+
+var requestLimiterQuietPeriod = time.Second
+
+type requestLimiter struct {
+	current    int64
+	limit      int64
+	lock       sync.Mutex
+	cond       *sync.Cond
+	quietUntil time.Time
+}
+
+// Acquire reserves one request slot, waiting if necessary.
+//
+// Acquire returns early if ctx cancels before a slot is available. It
+// is assumed in this case the caller will immediately notice
+// ctx.Err() != nil and call Release().
+func (rl *requestLimiter) Acquire(ctx context.Context) {
+	rl.lock.Lock()
+	if rl.cond == nil {
+		// First use of requestLimiter. Initialize.
+		rl.cond = sync.NewCond(&rl.lock)
+	}
+	// Wait out the quiet period(s) immediately following a 503.
+	for ctx.Err() == nil {
+		delay := rl.quietUntil.Sub(time.Now())
+		if delay < 0 {
+			break
+		}
+		// Wait for the end of the quiet period, which started
+		// when we last received a 503 response.
+		rl.lock.Unlock()
+		timer := time.NewTimer(delay)
+		select {
+		case <-timer.C:
+		case <-ctx.Done():
+			timer.Stop()
+		}
+		rl.lock.Lock()
+	}
+	ready := make(chan struct{})
+	go func() {
+		// close ready when a slot is available _or_ we wake
+		// up and find ctx has been canceled (meaning Acquire
+		// has already returned, or is about to).
+		for rl.limit > 0 && rl.limit <= rl.current && ctx.Err() == nil {
+			rl.cond.Wait()
+		}
+		close(ready)
+	}()
+	select {
+	case <-ready:
+		// Wait() returned, so we have the lock.
+		rl.current++
+		rl.lock.Unlock()
+	case <-ctx.Done():
+		// When Wait() returns the lock to our goroutine
+		// (which might have already happened) we need to
+		// release it (if we don't do this now, the following
+		// Lock() can deadlock).
+		go func() {
+			<-ready
+			rl.lock.Unlock()
+		}()
+		// Note we may have current > limit until the caller
+		// calls Release().
+		rl.lock.Lock()
+		rl.current++
+		rl.lock.Unlock()
+	}
+}
+
+// Release releases a slot that has been reserved with Acquire.
+func (rl *requestLimiter) Release() {
+	rl.lock.Lock()
+	rl.current--
+	rl.lock.Unlock()
+	rl.cond.Signal()
+}
+
+// Report uses the return values from (*http.Client)Do() to adjust the
+// outgoing request limit (increase on success, decrease on 503).
+func (rl *requestLimiter) Report(resp *http.Response, err error) {
+	if err != nil {
+		return
+	}
+	rl.lock.Lock()
+	defer rl.lock.Unlock()
+	if resp.StatusCode == http.StatusServiceUnavailable {
+		if rl.limit == 0 {
+			// Concurrency was unlimited until now.
+			// Calculate new limit based on actual
+			// concurrency instead of previous limit.
+			rl.limit = rl.current
+		}
+		if time.Now().After(rl.quietUntil) {
+			// Reduce concurrency limit by half.
+			rl.limit = (rl.limit + 1) / 2
+			// Don't start any new calls (or reduce the
+			// limit even further on additional 503s) for
+			// a second.
+			rl.quietUntil = time.Now().Add(requestLimiterQuietPeriod)
+		}
+	} else if resp.StatusCode >= 200 && resp.StatusCode < 400 && rl.limit > 0 {
+		// After each non-server-error response, increase
+		// concurrency limit by at least 10% -- but not beyond
+		// 2x the highest concurrency level we've seen without
+		// a failure.
+		increase := rl.limit / 10
+		if increase < 1 {
+			increase = 1
+		}
+		rl.limit += increase
+		if max := rl.current * 2; max > rl.limit {
+			rl.limit = max
+		}
+		rl.cond.Broadcast()
+	}
+}
diff --git a/sdk/go/arvados/limiter_test.go b/sdk/go/arvados/limiter_test.go
new file mode 100644
index 000000000..d1f5e837a
--- /dev/null
+++ b/sdk/go/arvados/limiter_test.go
@@ -0,0 +1,107 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package arvados
+
+import (
+	"context"
+	"errors"
+	"net/http"
+	"sync"
+	"time"
+
+	. "gopkg.in/check.v1"
+)
+
+var _ = Suite(&limiterSuite{})
+
+type limiterSuite struct{}
+
+func (*limiterSuite) TestLimiter(c *C) {
+	ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute))
+	defer cancel()
+	rl := requestLimiter{}
+
+	// unlimited concurrency before first call to Report()
+	{
+		var wg sync.WaitGroup
+		wg.Add(1000)
+		for i := 0; i < 1000; i++ {
+			go func() {
+				rl.Acquire(ctx)
+				wg.Done()
+			}()
+		}
+		wg.Wait()
+		c.Check(rl.current, Equals, int64(1000))
+		wg.Add(1000)
+		for i := 0; i < 1000; i++ {
+			go func() {
+				rl.Release()
+				wg.Done()
+			}()
+		}
+		wg.Wait()
+		c.Check(rl.current, Equals, int64(0))
+	}
+
+	// context cancels while waiting for Acquire
+	{
+		rl.limit = 1
+		rl.Acquire(ctx)
+		ctx, cancel := context.WithDeadline(ctx, time.Now().Add(time.Millisecond))
+		defer cancel()
+		rl.Acquire(ctx)
+		c.Check(rl.current, Equals, int64(2))
+		c.Check(ctx.Err(), NotNil)
+		rl.Release()
+		rl.Release()
+		c.Check(rl.current, Equals, int64(0))
+	}
+
+	// Use a short quiet period to make tests faster
+	defer func(orig time.Duration) { requestLimiterQuietPeriod = orig }(requestLimiterQuietPeriod)
+	requestLimiterQuietPeriod = time.Second / 10
+
+	// Immediately after a 503, limit is decreased, and Acquire()
+	// waits for a quiet period
+	{
+		rl.limit = 0
+		for i := 0; i < 5; i++ {
+			rl.Acquire(ctx)
+		}
+		ctx, cancel := context.WithDeadline(ctx, time.Now().Add(requestLimiterQuietPeriod/10))
+		defer cancel()
+		rl.Report(&http.Response{StatusCode: http.StatusServiceUnavailable}, nil)
+		c.Check(rl.limit, Equals, int64(3))
+
+		for i := 0; i < 5; i++ {
+			rl.Release()
+		}
+
+		// Even with all slots released, we can't Acquire in
+		// the quiet period.
+		acquire := time.Now()
+		rl.Acquire(ctx)
+		c.Check(ctx.Err(), Equals, context.DeadlineExceeded)
+		c.Check(time.Since(acquire) < requestLimiterQuietPeriod/2, Equals, true)
+		c.Check(rl.quietUntil.Sub(time.Now()) > requestLimiterQuietPeriod/2, Equals, true)
+		rl.Release()
+	}
+
+	// Acquire waits for the quiet period to expire.
+	{
+		ctx, cancel := context.WithDeadline(ctx, time.Now().Add(requestLimiterQuietPeriod*2))
+		defer cancel()
+		acquire := time.Now()
+		rl.Acquire(ctx)
+		c.Check(time.Since(acquire) > requestLimiterQuietPeriod/10, Equals, true)
+		c.Check(time.Since(acquire) < requestLimiterQuietPeriod, Equals, true)
+		c.Check(ctx.Err(), IsNil)
+		rl.Release()
+	}
+
+	// OK to call Report() with nil Response and non-nil error.
+	rl.Report(nil, errors.New("network error"))
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list