[arvados] created: 2.5.0-110-g35e9b21d8
git repository hosting
git at public.arvados.org
Fri Feb 10 15:37:48 UTC 2023
at 35e9b21d80569fec8860596213c72d199f79a593 (commit)
commit 35e9b21d80569fec8860596213c72d199f79a593
Author: Tom Clegg <tom at curii.com>
Date: Thu Feb 9 11:11:22 2023 -0500
19973: Limit container concurrency when API is returning 503.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/dispatchcloud/dispatcher.go b/lib/dispatchcloud/dispatcher.go
index 3403c50c9..270c6d43d 100644
--- a/lib/dispatchcloud/dispatcher.go
+++ b/lib/dispatchcloud/dispatcher.go
@@ -193,7 +193,7 @@ func (disp *dispatcher) run() {
if pollInterval <= 0 {
pollInterval = defaultPollInterval
}
- sched := scheduler.New(disp.Context, disp.queue, disp.pool, disp.Registry, staleLockTimeout, pollInterval)
+ sched := scheduler.New(disp.Context, disp.ArvClient, disp.queue, disp.pool, disp.Registry, staleLockTimeout, pollInterval)
sched.Start()
defer sched.Stop()
diff --git a/lib/dispatchcloud/scheduler/run_queue.go b/lib/dispatchcloud/scheduler/run_queue.go
index f729f0dc2..cfd95e945 100644
--- a/lib/dispatchcloud/scheduler/run_queue.go
+++ b/lib/dispatchcloud/scheduler/run_queue.go
@@ -13,6 +13,8 @@ import (
"github.com/sirupsen/logrus"
)
+var quietAfter503 = time.Minute
+
func (sch *Scheduler) runQueue() {
unsorted, _ := sch.queue.Entries()
sorted := make([]container.QueueEnt, 0, len(unsorted))
@@ -35,15 +37,47 @@ func (sch *Scheduler) runQueue() {
running := sch.pool.Running()
unalloc := sch.pool.Unallocated()
+ if t := sch.client.Last503(); t.After(sch.last503time) {
+ // API has sent an HTTP 503 response since last time
+ // we checked. Use current #containers - 1 as
+ // maxConcurrency, i.e., try to stay just below the
+ // level where we see 503s.
+ sch.last503time = t
+ if newlimit := len(running) - 1; newlimit < 1 {
+ sch.maxConcurrency = 1
+ } else {
+ sch.maxConcurrency = newlimit
+ }
+ } else if sch.maxConcurrency > 0 && time.Since(sch.last503time) > quietAfter503 {
+ // If we haven't seen any 503 errors lately, raise
+ // limit to ~10% beyond the current workload.
+ //
+ // As we use the added 10% to schedule more
+ // containers, len(running) will increase and we'll
+ // push the limit up further. Soon enough,
+ // maxConcurrency will get high enough to schedule the
+ // entire queue, hit pool quota, or get 503s again.
+ max := len(running)*11/10 + 1
+ if sch.maxConcurrency < max {
+ sch.maxConcurrency = max
+ }
+ }
+
sch.logger.WithFields(logrus.Fields{
- "Containers": len(sorted),
- "Processes": len(running),
+ "Containers": len(sorted),
+ "Processes": len(running),
+ "maxConcurrency": sch.maxConcurrency,
}).Debug("runQueue")
dontstart := map[arvados.InstanceType]bool{}
var overquota []container.QueueEnt // entries that are unmappable because of worker pool quota
var containerAllocatedWorkerBootingCount int
+ // trying is #containers running + #containers we're trying to
+ // start. We stop trying to start more containers if this
+ // reaches the dynamic maxConcurrency limit.
+ trying := len(running)
+
tryrun:
for i, ctr := range sorted {
ctr, it := ctr.Container, ctr.InstanceType
@@ -56,8 +90,14 @@ tryrun:
}
switch ctr.State {
case arvados.ContainerStateQueued:
+ if sch.maxConcurrency > 0 && trying >= sch.maxConcurrency {
+ logger.Tracef("not locking: already at maxConcurrency %d", sch.maxConcurrency)
+ overquota = sorted[i:]
+ break tryrun
+ }
+ trying++
if unalloc[it] < 1 && sch.pool.AtQuota() {
- logger.Debug("not locking: AtQuota and no unalloc workers")
+ logger.Trace("not locking: AtQuota and no unalloc workers")
overquota = sorted[i:]
break tryrun
}
@@ -68,6 +108,12 @@ tryrun:
go sch.lockContainer(logger, ctr.UUID)
unalloc[it]--
case arvados.ContainerStateLocked:
+ if sch.maxConcurrency > 0 && trying >= sch.maxConcurrency {
+ logger.Debugf("not starting: already at maxConcurrency %d", sch.maxConcurrency)
+ overquota = sorted[i:]
+ break tryrun
+ }
+ trying++
if unalloc[it] > 0 {
unalloc[it]--
} else if sch.pool.AtQuota() {
@@ -115,10 +161,13 @@ tryrun:
if len(overquota) > 0 {
// Unlock any containers that are unmappable while
- // we're at quota.
+ // we're at quota (but if they have already been
+ // scheduled and they're loading docker images etc.,
+ // let them run).
for _, ctr := range overquota {
ctr := ctr.Container
- if ctr.State == arvados.ContainerStateLocked {
+ _, toolate := running[ctr.UUID]
+ if ctr.State == arvados.ContainerStateLocked && !toolate {
logger := sch.logger.WithField("ContainerUUID", ctr.UUID)
logger.Debug("unlock because pool capacity is used by higher priority containers")
err := sch.queue.Unlock(ctr.UUID)
diff --git a/lib/dispatchcloud/scheduler/scheduler.go b/lib/dispatchcloud/scheduler/scheduler.go
index c3e67dd11..589aa3ec1 100644
--- a/lib/dispatchcloud/scheduler/scheduler.go
+++ b/lib/dispatchcloud/scheduler/scheduler.go
@@ -31,6 +31,7 @@ import (
// shuts down idle workers, in case they are consuming quota.
type Scheduler struct {
logger logrus.FieldLogger
+ client *arvados.Client
queue ContainerQueue
pool WorkerPool
reg *prometheus.Registry
@@ -45,6 +46,9 @@ type Scheduler struct {
stop chan struct{}
stopped chan struct{}
+ last503time time.Time // last time API responded 503
+ maxConcurrency int // dynamic container limit (0 = unlimited), see runQueue()
+
mContainersAllocatedNotStarted prometheus.Gauge
mContainersNotAllocatedOverQuota prometheus.Gauge
mLongestWaitTimeSinceQueue prometheus.Gauge
@@ -54,9 +58,10 @@ type Scheduler struct {
//
// Any given queue and pool should not be used by more than one
// scheduler at a time.
-func New(ctx context.Context, queue ContainerQueue, pool WorkerPool, reg *prometheus.Registry, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler {
+func New(ctx context.Context, client *arvados.Client, queue ContainerQueue, pool WorkerPool, reg *prometheus.Registry, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler {
sch := &Scheduler{
logger: ctxlog.FromContext(ctx),
+ client: client,
queue: queue,
pool: pool,
reg: reg,
diff --git a/sdk/go/arvados/client.go b/sdk/go/arvados/client.go
index b400a8474..5ace3b9d8 100644
--- a/sdk/go/arvados/client.go
+++ b/sdk/go/arvados/client.go
@@ -23,6 +23,7 @@ import (
"os"
"regexp"
"strings"
+ "sync/atomic"
"time"
"git.arvados.org/arvados.git/sdk/go/httpserver"
@@ -82,6 +83,8 @@ type Client struct {
// provided by http.Transport) when concurrent calls are
// multiplexed on a single http2 connection.
requestLimiter
+
+ last503 atomic.Value
}
// InsecureHTTPClient is the default http.Client used by a Client with
@@ -275,6 +278,9 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
resp, err := c.httpClient().Do(req)
c.requestLimiter.Report(resp, err)
+ if err == nil && resp.StatusCode == http.StatusServiceUnavailable {
+ c.last503.Store(time.Now())
+ }
if err == nil {
// We need to call cancel() eventually, but we can't
// use "defer cancel()" because the context has to
@@ -287,6 +293,13 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
return resp, err
}
+// Last503 returns the time of the most recent HTTP 503 (Service
+// Unavailable) response. Zero time indicates never.
+func (c *Client) Last503() time.Time {
+ t, _ := c.last503.Load().(time.Time)
+ return t
+}
+
// cancelOnClose calls a provided CancelFunc when its wrapped
// ReadCloser's Close() method is called.
type cancelOnClose struct {
commit 69dcc65ee6b5832f7671c68051dd792981369da0
Author: Tom Clegg <tom at curii.com>
Date: Wed Feb 8 17:04:25 2023 -0500
19973: Limit arvados.Client request concurrency.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/sdk/go/arvados/client.go b/sdk/go/arvados/client.go
index 5a498b01f..b400a8474 100644
--- a/sdk/go/arvados/client.go
+++ b/sdk/go/arvados/client.go
@@ -76,6 +76,12 @@ type Client struct {
// APIHost and AuthToken were loaded from ARVADOS_* env vars
// (used to customize "no host/token" error messages)
loadedFromEnv bool
+
+ // Track/limit concurrent outgoing API calls. Note this
+ // differs from an outgoing connection limit (a feature
+ // provided by http.Transport) when concurrent calls are
+ // multiplexed on a single http2 connection.
+ requestLimiter
}
// InsecureHTTPClient is the default http.Client used by a Client with
@@ -220,10 +226,12 @@ func NewClientFromEnv() *Client {
var reqIDGen = httpserver.IDGenerator{Prefix: "req-"}
-// Do adds Authorization and X-Request-Id headers and then calls
+// Do adds Authorization and X-Request-Id headers, delays in order to
+// comply with rate-limiting restrictions, and then calls
// (*http.Client)Do().
func (c *Client) Do(req *http.Request) (*http.Response, error) {
- if auth, _ := req.Context().Value(contextKeyAuthorization{}).(string); auth != "" {
+ ctx := req.Context()
+ if auth, _ := ctx.Value(contextKeyAuthorization{}).(string); auth != "" {
req.Header.Add("Authorization", auth)
} else if c.AuthToken != "" {
req.Header.Add("Authorization", "OAuth2 "+c.AuthToken)
@@ -231,7 +239,7 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
if req.Header.Get("X-Request-Id") == "" {
var reqid string
- if ctxreqid, _ := req.Context().Value(contextKeyRequestID{}).(string); ctxreqid != "" {
+ if ctxreqid, _ := ctx.Value(contextKeyRequestID{}).(string); ctxreqid != "" {
reqid = ctxreqid
} else if c.defaultRequestID != "" {
reqid = c.defaultRequestID
@@ -246,18 +254,34 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
}
var cancel context.CancelFunc
if c.Timeout > 0 {
- ctx := req.Context()
ctx, cancel = context.WithDeadline(ctx, time.Now().Add(c.Timeout))
req = req.WithContext(ctx)
+ } else {
+ cancel = context.CancelFunc(func() {})
+ }
+
+ c.requestLimiter.Acquire(ctx)
+ if ctx.Err() != nil {
+ c.requestLimiter.Release()
+ return nil, ctx.Err()
}
+
+ // Attach Release() to cancel func, see cancelOnClose below.
+ cancelOrig := cancel
+ cancel = func() {
+ c.requestLimiter.Release()
+ cancelOrig()
+ }
+
resp, err := c.httpClient().Do(req)
- if err == nil && cancel != nil {
+ c.requestLimiter.Report(resp, err)
+ if err == nil {
// We need to call cancel() eventually, but we can't
// use "defer cancel()" because the context has to
// stay alive until the caller has finished reading
// the response body.
resp.Body = cancelOnClose{ReadCloser: resp.Body, cancel: cancel}
- } else if cancel != nil {
+ } else {
cancel()
}
return resp, err
diff --git a/sdk/go/arvados/limiter.go b/sdk/go/arvados/limiter.go
new file mode 100644
index 000000000..ecbe0b180
--- /dev/null
+++ b/sdk/go/arvados/limiter.go
@@ -0,0 +1,130 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package arvados
+
+import (
+ "context"
+ "net/http"
+ "sync"
+ "time"
+)
+
+var requestLimiterQuietPeriod = time.Second
+
+type requestLimiter struct {
+ current int64
+ limit int64
+ lock sync.Mutex
+ cond *sync.Cond
+ quietUntil time.Time
+}
+
+// Acquire reserves one request slot, waiting if necessary.
+//
+// Acquire returns early if ctx cancels before a slot is available. It
+// is assumed in this case the caller will immediately notice
+// ctx.Err() != nil and call Release().
+func (rl *requestLimiter) Acquire(ctx context.Context) {
+ rl.lock.Lock()
+ if rl.cond == nil {
+ // First use of requestLimiter. Initialize.
+ rl.cond = sync.NewCond(&rl.lock)
+ }
+ // Wait out the quiet period(s) immediately following a 503.
+ for ctx.Err() == nil {
+ delay := rl.quietUntil.Sub(time.Now())
+ if delay < 0 {
+ break
+ }
+ // Wait for the end of the quiet period, which started
+ // when we last received a 503 response.
+ rl.lock.Unlock()
+ timer := time.NewTimer(delay)
+ select {
+ case <-timer.C:
+ case <-ctx.Done():
+ timer.Stop()
+ }
+ rl.lock.Lock()
+ }
+ ready := make(chan struct{})
+ go func() {
+ // close ready when a slot is available _or_ we wake
+ // up and find ctx has been canceled (meaning Acquire
+ // has already returned, or is about to).
+ for rl.limit > 0 && rl.limit <= rl.current && ctx.Err() == nil {
+ rl.cond.Wait()
+ }
+ close(ready)
+ }()
+ select {
+ case <-ready:
+ // Wait() returned, so we have the lock.
+ rl.current++
+ rl.lock.Unlock()
+ case <-ctx.Done():
+ // When Wait() returns the lock to our goroutine
+ // (which might have already happened) we need to
+ // release it (if we don't do this now, the following
+ // Lock() can deadlock).
+ go func() {
+ <-ready
+ rl.lock.Unlock()
+ }()
+ // Note we may have current > limit until the caller
+ // calls Release().
+ rl.lock.Lock()
+ rl.current++
+ rl.lock.Unlock()
+ }
+}
+
+// Release releases a slot that has been reserved with Acquire.
+func (rl *requestLimiter) Release() {
+ rl.lock.Lock()
+ rl.current--
+ rl.lock.Unlock()
+ rl.cond.Signal()
+}
+
+// Report uses the return values from (*http.Client)Do() to adjust the
+// outgoing request limit (increase on success, decrease on 503).
+func (rl *requestLimiter) Report(resp *http.Response, err error) {
+ if err != nil {
+ return
+ }
+ rl.lock.Lock()
+ defer rl.lock.Unlock()
+ if resp.StatusCode == http.StatusServiceUnavailable {
+ if rl.limit == 0 {
+ // Concurrency was unlimited until now.
+ // Calculate new limit based on actual
+ // concurrency instead of previous limit.
+ rl.limit = rl.current
+ }
+ if time.Now().After(rl.quietUntil) {
+ // Reduce concurrency limit by half.
+ rl.limit = (rl.limit + 1) / 2
+ // Don't start any new calls (or reduce the
+ // limit even further on additional 503s) for
+ // a second.
+ rl.quietUntil = time.Now().Add(requestLimiterQuietPeriod)
+ }
+ } else if resp.StatusCode >= 200 && resp.StatusCode < 400 && rl.limit > 0 {
+ // After each non-server-error response, increase
+ // concurrency limit by at least 10% -- but not beyond
+ // 2x the highest concurrency level we've seen without
+ // a failure.
+ increase := rl.limit / 10
+ if increase < 1 {
+ increase = 1
+ }
+ rl.limit += increase
+ if max := rl.current * 2; max > rl.limit {
+ rl.limit = max
+ }
+ rl.cond.Broadcast()
+ }
+}
diff --git a/sdk/go/arvados/limiter_test.go b/sdk/go/arvados/limiter_test.go
new file mode 100644
index 000000000..d1f5e837a
--- /dev/null
+++ b/sdk/go/arvados/limiter_test.go
@@ -0,0 +1,107 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package arvados
+
+import (
+ "context"
+ "errors"
+ "net/http"
+ "sync"
+ "time"
+
+ . "gopkg.in/check.v1"
+)
+
+var _ = Suite(&limiterSuite{})
+
+type limiterSuite struct{}
+
+func (*limiterSuite) TestLimiter(c *C) {
+ ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute))
+ defer cancel()
+ rl := requestLimiter{}
+
+ // unlimited concurrency before first call to Report()
+ {
+ var wg sync.WaitGroup
+ wg.Add(1000)
+ for i := 0; i < 1000; i++ {
+ go func() {
+ rl.Acquire(ctx)
+ wg.Done()
+ }()
+ }
+ wg.Wait()
+ c.Check(rl.current, Equals, int64(1000))
+ wg.Add(1000)
+ for i := 0; i < 1000; i++ {
+ go func() {
+ rl.Release()
+ wg.Done()
+ }()
+ }
+ wg.Wait()
+ c.Check(rl.current, Equals, int64(0))
+ }
+
+ // context cancels while waiting for Acquire
+ {
+ rl.limit = 1
+ rl.Acquire(ctx)
+ ctx, cancel := context.WithDeadline(ctx, time.Now().Add(time.Millisecond))
+ defer cancel()
+ rl.Acquire(ctx)
+ c.Check(rl.current, Equals, int64(2))
+ c.Check(ctx.Err(), NotNil)
+ rl.Release()
+ rl.Release()
+ c.Check(rl.current, Equals, int64(0))
+ }
+
+ // Use a short quiet period to make tests faster
+ defer func(orig time.Duration) { requestLimiterQuietPeriod = orig }(requestLimiterQuietPeriod)
+ requestLimiterQuietPeriod = time.Second / 10
+
+ // Immediately after a 503, limit is decreased, and Acquire()
+ // waits for a quiet period
+ {
+ rl.limit = 0
+ for i := 0; i < 5; i++ {
+ rl.Acquire(ctx)
+ }
+ ctx, cancel := context.WithDeadline(ctx, time.Now().Add(requestLimiterQuietPeriod/10))
+ defer cancel()
+ rl.Report(&http.Response{StatusCode: http.StatusServiceUnavailable}, nil)
+ c.Check(rl.limit, Equals, int64(3))
+
+ for i := 0; i < 5; i++ {
+ rl.Release()
+ }
+
+ // Even with all slots released, we can't Acquire in
+ // the quiet period.
+ acquire := time.Now()
+ rl.Acquire(ctx)
+ c.Check(ctx.Err(), Equals, context.DeadlineExceeded)
+ c.Check(time.Since(acquire) < requestLimiterQuietPeriod/2, Equals, true)
+ c.Check(rl.quietUntil.Sub(time.Now()) > requestLimiterQuietPeriod/2, Equals, true)
+ rl.Release()
+ }
+
+ // Acquire waits for the quiet period to expire.
+ {
+ ctx, cancel := context.WithDeadline(ctx, time.Now().Add(requestLimiterQuietPeriod*2))
+ defer cancel()
+ acquire := time.Now()
+ rl.Acquire(ctx)
+ c.Check(time.Since(acquire) > requestLimiterQuietPeriod/10, Equals, true)
+ c.Check(time.Since(acquire) < requestLimiterQuietPeriod, Equals, true)
+ c.Check(ctx.Err(), IsNil)
+ rl.Release()
+ }
+
+ // OK to call Report() with nil Response and non-nil error.
+ rl.Report(nil, errors.New("network error"))
+}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list