[arvados] created: 2.6.0-468-g9a857a4f8
git repository hosting
git at public.arvados.org
Wed Aug 9 15:48:00 UTC 2023
at 9a857a4f86dc4cbe5d8214bfb3ea9e8d4dac6a76 (commit)
commit 9a857a4f86dc4cbe5d8214bfb3ea9e8d4dac6a76
Author: Tom Clegg <tom at curii.com>
Date: Mon Aug 7 15:26:20 2023 -0400
20457: When passing prior quota level, raise maxConcurrency slowly.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/dispatchcloud/scheduler/run_queue.go b/lib/dispatchcloud/scheduler/run_queue.go
index 0c4634d75..9817101bb 100644
--- a/lib/dispatchcloud/scheduler/run_queue.go
+++ b/lib/dispatchcloud/scheduler/run_queue.go
@@ -92,7 +92,13 @@ func (sch *Scheduler) runQueue() {
if sch.maxInstances > 0 && sch.maxConcurrency > sch.maxInstances {
sch.maxConcurrency = sch.maxInstances
}
- if sch.pool.AtQuota() && len(running) > 0 && (sch.maxConcurrency == 0 || sch.maxConcurrency > len(running)) {
+ instances := len(running) + len(unalloc)
+ if sch.instancesWithinQuota > 0 && sch.instancesWithinQuota < instances {
+ // Evidently it is possible to run this many
+ // instances, so raise our estimate.
+ sch.instancesWithinQuota = instances
+ }
+ if sch.pool.AtQuota() {
// Consider current workload to be the maximum
// allowed, for the sake of reporting metrics and
// calculating max supervisors.
@@ -103,7 +109,27 @@ func (sch *Scheduler) runQueue() {
// supervisors when we reach the cloud-imposed quota
// (which may be based on # CPUs etc) long before the
// configured MaxInstances.
- sch.maxConcurrency = len(running)
+ if sch.maxConcurrency == 0 || sch.maxConcurrency > instances {
+ if instances == 0 {
+ sch.maxConcurrency = 1
+ } else {
+ sch.maxConcurrency = instances
+ }
+ }
+ sch.instancesWithinQuota = instances
+ } else if sch.instancesWithinQuota > 0 && sch.maxConcurrency > sch.instancesWithinQuota+1 {
+ // Once we've hit a quota error and started tracking
+ // instancesWithinQuota (i.e., it's not zero), we
+ // avoid exceeding that known-working level by more
+ // than 1.
+ //
+ // If we don't do this, we risk entering a pattern of
+ // repeatedly locking several containers, hitting
+ // quota again, and unlocking them again each time the
+ // driver stops reporting AtQuota, which tends to use
+ // up the max lock/unlock cycles on the next few
+ // containers in the queue, and cause them to fail.
+ sch.maxConcurrency = sch.instancesWithinQuota + 1
}
sch.mMaxContainerConcurrency.Set(float64(sch.maxConcurrency))
diff --git a/lib/dispatchcloud/scheduler/scheduler.go b/lib/dispatchcloud/scheduler/scheduler.go
index 1db12279d..ee7ab5088 100644
--- a/lib/dispatchcloud/scheduler/scheduler.go
+++ b/lib/dispatchcloud/scheduler/scheduler.go
@@ -46,10 +46,11 @@ type Scheduler struct {
stop chan struct{}
stopped chan struct{}
- last503time time.Time // last time API responded 503
- maxConcurrency int // dynamic container limit (0 = unlimited), see runQueue()
- supervisorFraction float64 // maximum fraction of "supervisor" containers (these are containers who's main job is to launch other containers, e.g. workflow runners)
- maxInstances int // maximum number of instances the pool will bring up (0 = unlimited)
+ last503time time.Time // last time API responded 503
+ maxConcurrency int // dynamic container limit (0 = unlimited), see runQueue()
+ supervisorFraction float64 // maximum fraction of "supervisor" containers (these are containers who's main job is to launch other containers, e.g. workflow runners)
+ maxInstances int // maximum number of instances the pool will bring up (0 = unlimited)
+ instancesWithinQuota int // max concurrency achieved since last quota error (0 = no quota error yet)
mContainersAllocatedNotStarted prometheus.Gauge
mContainersNotAllocatedOverQuota prometheus.Gauge
diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go
index 1d600e370..f79bad98f 100644
--- a/lib/dispatchcloud/worker/pool.go
+++ b/lib/dispatchcloud/worker/pool.go
@@ -173,19 +173,20 @@ type Pool struct {
runnerArgs []string // extra args passed to crunch-run
// private state
- subscribers map[<-chan struct{}]chan<- struct{}
- creating map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
- workers map[cloud.InstanceID]*worker
- loaded bool // loaded list of instances from InstanceSet at least once
- exited map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
- atQuotaUntil time.Time
- atQuotaErr cloud.QuotaError
- stop chan bool
- mtx sync.RWMutex
- setupOnce sync.Once
- runnerData []byte
- runnerMD5 [md5.Size]byte
- runnerCmd string
+ subscribers map[<-chan struct{}]chan<- struct{}
+ creating map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
+ workers map[cloud.InstanceID]*worker
+ loaded bool // loaded list of instances from InstanceSet at least once
+ exited map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
+ atQuotaUntilFewerInstances int
+ atQuotaUntil time.Time
+ atQuotaErr cloud.QuotaError
+ stop chan bool
+ mtx sync.RWMutex
+ setupOnce sync.Once
+ runnerData []byte
+ runnerMD5 [md5.Size]byte
+ runnerCmd string
mContainersRunning prometheus.Gauge
mInstances *prometheus.GaugeVec
@@ -322,6 +323,7 @@ func (wp *Pool) Create(it arvados.InstanceType) bool {
wp.mtx.Lock()
defer wp.mtx.Unlock()
if time.Now().Before(wp.atQuotaUntil) ||
+ wp.atQuotaUntilFewerInstances > 0 ||
wp.instanceSet.throttleCreate.Error() != nil ||
(wp.maxInstances > 0 && wp.maxInstances <= len(wp.workers)+len(wp.creating)) {
return false
@@ -360,8 +362,24 @@ func (wp *Pool) Create(it arvados.InstanceType) bool {
if err != nil {
if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
wp.atQuotaErr = err
- wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
- time.AfterFunc(quotaErrorTTL, wp.notify)
+ n := len(wp.workers) + len(wp.creating) - 1
+ if n < 1 {
+ // Quota error with no
+ // instances running --
+ // nothing to do but wait
+ wp.atQuotaUntilFewerInstances = 0
+ wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
+ time.AfterFunc(quotaErrorTTL, wp.notify)
+ logger.WithField("atQuotaUntil", wp.atQuotaUntil).Info("quota error with 0 running -- waiting for quotaErrorTTL")
+ } else if n < wp.atQuotaUntilFewerInstances || wp.atQuotaUntilFewerInstances == 0 {
+ // Quota error with N
+ // instances running -- report
+ // AtQuota until some
+ // instances shut down
+ wp.atQuotaUntilFewerInstances = n
+ wp.atQuotaUntil = time.Time{}
+ logger.WithField("atQuotaUntilFewerInstances", n).Info("quota error -- waiting for next instance shutdown")
+ }
}
logger.WithError(err).Error("create failed")
wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
@@ -381,7 +399,9 @@ func (wp *Pool) Create(it arvados.InstanceType) bool {
func (wp *Pool) AtQuota() bool {
wp.mtx.Lock()
defer wp.mtx.Unlock()
- return time.Now().Before(wp.atQuotaUntil) || (wp.maxInstances > 0 && wp.maxInstances <= len(wp.workers)+len(wp.creating))
+ return wp.atQuotaUntilFewerInstances > 0 ||
+ time.Now().Before(wp.atQuotaUntil) ||
+ (wp.maxInstances > 0 && wp.maxInstances <= len(wp.workers)+len(wp.creating))
}
// SetIdleBehavior determines how the indicated instance will behave
@@ -1032,6 +1052,10 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
notify = true
}
+ if wp.atQuotaUntilFewerInstances > len(wp.workers)+len(wp.creating) {
+ wp.atQuotaUntilFewerInstances = 0
+ }
+
if !wp.loaded {
notify = true
wp.loaded = true
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list