[arvados] created: 2.7.0-4998-gc29c2feb69
git repository hosting
git at public.arvados.org
Mon Oct 16 19:07:45 UTC 2023
at c29c2feb691b5abbffaf8f177222d2973ad1b19f (commit)
commit c29c2feb691b5abbffaf8f177222d2973ad1b19f
Author: Tom Clegg <tom at curii.com>
Date: Mon Oct 16 15:04:36 2023 -0400
20984: Handle "instance type not available" condition better.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/cloud/ec2/ec2.go b/lib/cloud/ec2/ec2.go
index 5e4df05f46..816df48d90 100644
--- a/lib/cloud/ec2/ec2.go
+++ b/lib/cloud/ec2/ec2.go
@@ -665,21 +665,36 @@ func (err rateLimitError) EarliestRetry() time.Time {
return err.earliestRetry
}
-var isCodeCapacity = map[string]bool{
+type capacityError struct {
+ error
+ isInstanceTypeSpecific bool
+}
+
+func (er *capacityError) IsCapacityError() bool {
+ return true
+}
+
+func (er *capacityError) IsInstanceTypeSpecific() bool {
+ return er.isInstanceTypeSpecific
+}
+
+var isCodeQuota = map[string]bool{
"InstanceLimitExceeded": true,
"InsufficientAddressCapacity": true,
"InsufficientFreeAddressesInSubnet": true,
- "InsufficientInstanceCapacity": true,
"InsufficientVolumeCapacity": true,
"MaxSpotInstanceCountExceeded": true,
"VcpuLimitExceeded": true,
}
-// isErrorCapacity returns whether the error is to be throttled based on its code.
+// isErrorQuota returns whether the error indicates we have reached
+// some usage quota/limit -- i.e., immediately retrying with an equal
+// or larger instance type will probably not work.
+//
// Returns false if error is nil.
-func isErrorCapacity(err error) bool {
+func isErrorQuota(err error) bool {
if aerr, ok := err.(awserr.Error); ok && aerr != nil {
- if _, ok := isCodeCapacity[aerr.Code()]; ok {
+ if _, ok := isCodeQuota[aerr.Code()]; ok {
return true
}
}
@@ -720,8 +735,10 @@ func wrapError(err error, throttleValue *atomic.Value) error {
}
throttleValue.Store(d)
return rateLimitError{error: err, earliestRetry: time.Now().Add(d)}
- } else if isErrorCapacity(err) {
+ } else if isErrorQuota(err) {
return &ec2QuotaError{err}
+ } else if aerr, ok := err.(awserr.Error); ok && aerr != nil && aerr.Code() == "InsufficientInstanceCapacity" {
+ return &capacityError{err, true}
} else if err != nil {
throttleValue.Store(time.Duration(0))
return err
diff --git a/lib/cloud/ec2/ec2_test.go b/lib/cloud/ec2/ec2_test.go
index 6fde4bbbca..a57fcebf76 100644
--- a/lib/cloud/ec2/ec2_test.go
+++ b/lib/cloud/ec2/ec2_test.go
@@ -508,8 +508,15 @@ func (*EC2InstanceSetSuite) TestWrapError(c *check.C) {
_, ok := wrapped.(cloud.RateLimitError)
c.Check(ok, check.Equals, true)
- quotaError := awserr.New("InsufficientInstanceCapacity", "", nil)
+ quotaError := awserr.New("InstanceLimitExceeded", "", nil)
wrapped = wrapError(quotaError, nil)
_, ok = wrapped.(cloud.QuotaError)
c.Check(ok, check.Equals, true)
+
+ capacityError := awserr.New("InsufficientInstanceCapacity", "", nil)
+ wrapped = wrapError(capacityError, nil)
+ caperr, ok := wrapped.(cloud.CapacityError)
+ c.Check(ok, check.Equals, true)
+ c.Check(caperr.IsCapacityError(), check.Equals, true)
+ c.Check(caperr.IsInstanceTypeSpecific(), check.Equals, true)
}
diff --git a/lib/cloud/interfaces.go b/lib/cloud/interfaces.go
index 7c532fda4a..a2aa9e1432 100644
--- a/lib/cloud/interfaces.go
+++ b/lib/cloud/interfaces.go
@@ -37,6 +37,20 @@ type QuotaError interface {
error
}
+// A CapacityError should be returned by an InstanceSet's Create
+// method when the cloud service indicates it has insufficient
+// capacity to create new instances -- i.e., we shouldn't retry right
+// away.
+type CapacityError interface {
+ // If true, wait before trying to create more instances.
+ IsCapacityError() bool
+ // If true, the condition is specific to the requested
+ // instance types. Wait before trying to create more
+ // instances of that same type.
+ IsInstanceTypeSpecific() bool
+ error
+}
+
type SharedResourceTags map[string]string
type InstanceSetID string
type InstanceTags map[string]string
diff --git a/lib/dispatchcloud/scheduler/interfaces.go b/lib/dispatchcloud/scheduler/interfaces.go
index 78f8c804e2..6e56bd8c40 100644
--- a/lib/dispatchcloud/scheduler/interfaces.go
+++ b/lib/dispatchcloud/scheduler/interfaces.go
@@ -34,6 +34,7 @@ type WorkerPool interface {
Running() map[string]time.Time
Unallocated() map[arvados.InstanceType]int
CountWorkers() map[worker.State]int
+ AtCapacity(arvados.InstanceType) bool
AtQuota() bool
Create(arvados.InstanceType) bool
Shutdown(arvados.InstanceType) bool
diff --git a/lib/dispatchcloud/scheduler/run_queue.go b/lib/dispatchcloud/scheduler/run_queue.go
index 6a717bf444..3505c3e064 100644
--- a/lib/dispatchcloud/scheduler/run_queue.go
+++ b/lib/dispatchcloud/scheduler/run_queue.go
@@ -149,6 +149,7 @@ func (sch *Scheduler) runQueue() {
}).Debug("runQueue")
dontstart := map[arvados.InstanceType]bool{}
+ var atcapacity = map[string]bool{} // ProviderTypes reported as AtCapacity during this runQueue() invocation
var overquota []container.QueueEnt // entries that are unmappable because of worker pool quota
var overmaxsuper []container.QueueEnt // unmappable because max supervisors (these are not included in overquota)
var containerAllocatedWorkerBootingCount int
@@ -189,6 +190,11 @@ tryrun:
overquota = sorted[i:]
break tryrun
}
+ if unalloc[it] < 1 && (atcapacity[it.ProviderType] || sch.pool.AtCapacity(it)) {
+ logger.Trace("not locking: AtCapacity and no unalloc workers")
+ atcapacity[it.ProviderType] = true
+ continue
+ }
if sch.pool.KillContainer(ctr.UUID, "about to lock") {
logger.Info("not locking: crunch-run process from previous attempt has not exited")
continue
@@ -211,6 +217,27 @@ tryrun:
logger.Trace("overquota")
overquota = sorted[i:]
break tryrun
+ } else if atcapacity[it.ProviderType] || sch.pool.AtCapacity(it) {
+ // Continue trying lower-priority
+ // containers in case they can run on
+ // different instance types that are
+ // available.
+ //
+ // The local "atcapacity" cache helps
+ // when the pool's flag resets after
+ // we look at container A but before
+ // we look at lower-priority container
+ // B. In that case we want to run
+ // container A on the next call to
+ // runQueue(), rather than run
+ // container B now.
+ //
+ // TODO: try running this container on
+ // a bigger (but not much more
+ // expensive) instance type.
+ logger.WithField("InstanceType", it.Name).Trace("at capacity")
+ atcapacity[it.ProviderType] = true
+ continue
} else if sch.pool.Create(it) {
// Success. (Note pool.Create works
// asynchronously and does its own
@@ -219,10 +246,7 @@ tryrun:
logger.Info("creating new instance")
} else {
// Failed despite not being at quota,
- // e.g., cloud ops throttled. TODO:
- // avoid getting starved here if
- // instances of a specific type always
- // fail.
+ // e.g., cloud ops throttled.
logger.Trace("pool declined to create new instance")
continue
}
diff --git a/lib/dispatchcloud/scheduler/run_queue_test.go b/lib/dispatchcloud/scheduler/run_queue_test.go
index a8df944b4c..76eecead5e 100644
--- a/lib/dispatchcloud/scheduler/run_queue_test.go
+++ b/lib/dispatchcloud/scheduler/run_queue_test.go
@@ -32,10 +32,12 @@ var (
type stubPool struct {
notify <-chan struct{}
unalloc map[arvados.InstanceType]int // idle+booting+unknown
+ busy map[arvados.InstanceType]int
idle map[arvados.InstanceType]int
unknown map[arvados.InstanceType]int
running map[string]time.Time
quota int
+ capacity map[string]int
canCreate int
creates []arvados.InstanceType
starts []string
@@ -55,6 +57,20 @@ func (p *stubPool) AtQuota() bool {
}
return n >= p.quota
}
+func (p *stubPool) AtCapacity(it arvados.InstanceType) bool {
+ supply, ok := p.capacity[it.ProviderType]
+ if !ok {
+ return false
+ }
+ for _, existing := range []map[arvados.InstanceType]int{p.unalloc, p.busy} {
+ for eit, n := range existing {
+ if eit.ProviderType == it.ProviderType {
+ supply -= n
+ }
+ }
+ }
+ return supply < 1
+}
func (p *stubPool) Subscribe() <-chan struct{} { return p.notify }
func (p *stubPool) Unsubscribe(<-chan struct{}) {}
func (p *stubPool) Running() map[string]time.Time {
@@ -116,6 +132,7 @@ func (p *stubPool) StartContainer(it arvados.InstanceType, ctr arvados.Container
if p.idle[it] == 0 {
return false
}
+ p.busy[it]++
p.idle[it]--
p.unalloc[it]--
p.running[ctr.UUID] = time.Time{}
@@ -186,6 +203,7 @@ func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) {
test.InstanceType(1): 1,
test.InstanceType(2): 2,
},
+ busy: map[arvados.InstanceType]int{},
running: map[string]time.Time{},
canCreate: 0,
}
@@ -236,6 +254,7 @@ func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) {
idle: map[arvados.InstanceType]int{
test.InstanceType(2): 2,
},
+ busy: map[arvados.InstanceType]int{},
running: map[string]time.Time{},
creates: []arvados.InstanceType{},
starts: []string{},
@@ -272,6 +291,85 @@ func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) {
}
}
+// If pool.AtCapacity(it) is true for one instance type, try running a
+// lower-priority container that uses a different node type. Don't
+// lock/unlock/start any container that requires the affected instance
+// type.
+func (*SchedulerSuite) TestInstanceCapacity(c *check.C) {
+ ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
+
+ queue := test.Queue{
+ ChooseType: chooseType,
+ Containers: []arvados.Container{
+ {
+ UUID: test.ContainerUUID(1),
+ Priority: 1,
+ State: arvados.ContainerStateLocked,
+ RuntimeConstraints: arvados.RuntimeConstraints{
+ VCPUs: 1,
+ RAM: 1 << 30,
+ },
+ },
+ {
+ UUID: test.ContainerUUID(2),
+ Priority: 2,
+ State: arvados.ContainerStateQueued,
+ RuntimeConstraints: arvados.RuntimeConstraints{
+ VCPUs: 4,
+ RAM: 4 << 30,
+ },
+ },
+ {
+ UUID: test.ContainerUUID(3),
+ Priority: 3,
+ State: arvados.ContainerStateLocked,
+ RuntimeConstraints: arvados.RuntimeConstraints{
+ VCPUs: 4,
+ RAM: 4 << 30,
+ },
+ },
+ {
+ UUID: test.ContainerUUID(4),
+ Priority: 4,
+ State: arvados.ContainerStateLocked,
+ RuntimeConstraints: arvados.RuntimeConstraints{
+ VCPUs: 4,
+ RAM: 4 << 30,
+ },
+ },
+ },
+ }
+ queue.Update()
+ pool := stubPool{
+ quota: 99,
+ capacity: map[string]int{test.InstanceType(4).ProviderType: 1},
+ unalloc: map[arvados.InstanceType]int{
+ test.InstanceType(4): 1,
+ },
+ idle: map[arvados.InstanceType]int{
+ test.InstanceType(4): 1,
+ },
+ busy: map[arvados.InstanceType]int{},
+ running: map[string]time.Time{},
+ creates: []arvados.InstanceType{},
+ starts: []string{},
+ canCreate: 99,
+ }
+ sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0, 0)
+ sch.sync()
+ sch.runQueue()
+ sch.sync()
+
+ // Start container4, but then pool reports AtCapacity for
+ // type4, so we skip trying to create an instance for
+ // container3, but do try to create a type2 instance for
+ // container2.
+ c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4), test.ContainerUUID(1)})
+ c.Check(pool.shutdowns, check.Equals, 0)
+ c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(1)})
+ c.Check(queue.StateChanges(), check.HasLen, 0)
+}
+
// Don't unlock containers or shutdown unalloc (booting/idle) nodes
// just because some 503 errors caused us to reduce maxConcurrency
// below the current load level.
@@ -347,6 +445,10 @@ func (*SchedulerSuite) TestIdleIn503QuietPeriod(c *check.C) {
test.InstanceType(2): 1,
test.InstanceType(3): 1,
},
+ busy: map[arvados.InstanceType]int{
+ test.InstanceType(2): 1,
+ test.InstanceType(3): 1,
+ },
running: map[string]time.Time{
test.ContainerUUID(1): {},
test.ContainerUUID(3): {},
@@ -400,6 +502,9 @@ func (*SchedulerSuite) TestUnlockExcessSupervisors(c *check.C) {
idle: map[arvados.InstanceType]int{
test.InstanceType(2): 1,
},
+ busy: map[arvados.InstanceType]int{
+ test.InstanceType(2): 4,
+ },
running: map[string]time.Time{
test.ContainerUUID(1): {},
test.ContainerUUID(2): {},
@@ -461,6 +566,9 @@ func (*SchedulerSuite) TestExcessSupervisors(c *check.C) {
idle: map[arvados.InstanceType]int{
test.InstanceType(2): 1,
},
+ busy: map[arvados.InstanceType]int{
+ test.InstanceType(2): 2,
+ },
running: map[string]time.Time{
test.ContainerUUID(5): {},
test.ContainerUUID(6): {},
@@ -515,6 +623,7 @@ func (*SchedulerSuite) TestEqualPriorityContainers(c *check.C) {
idle: map[arvados.InstanceType]int{
test.InstanceType(3): 2,
},
+ busy: map[arvados.InstanceType]int{},
running: map[string]time.Time{},
creates: []arvados.InstanceType{},
starts: []string{},
@@ -553,6 +662,7 @@ func (*SchedulerSuite) TestStartWhileCreating(c *check.C) {
test.InstanceType(1): 1,
test.InstanceType(2): 1,
},
+ busy: map[arvados.InstanceType]int{},
running: map[string]time.Time{},
canCreate: 4,
}
@@ -646,6 +756,9 @@ func (*SchedulerSuite) TestKillNonexistentContainer(c *check.C) {
idle: map[arvados.InstanceType]int{
test.InstanceType(2): 0,
},
+ busy: map[arvados.InstanceType]int{
+ test.InstanceType(2): 1,
+ },
running: map[string]time.Time{
test.ContainerUUID(2): {},
},
@@ -742,6 +855,7 @@ func (*SchedulerSuite) TestContainersMetrics(c *check.C) {
pool = stubPool{
idle: map[arvados.InstanceType]int{test.InstanceType(1): 1},
unalloc: map[arvados.InstanceType]int{test.InstanceType(1): 1},
+ busy: map[arvados.InstanceType]int{},
running: map[string]time.Time{},
}
sch = New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0, 0)
@@ -815,6 +929,7 @@ func (*SchedulerSuite) TestSkipSupervisors(c *check.C) {
test.InstanceType(1): 4,
test.InstanceType(2): 4,
},
+ busy: map[arvados.InstanceType]int{},
running: map[string]time.Time{},
canCreate: 0,
}
diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go
index 15b0dbcde5..fc9f5445d6 100644
--- a/lib/dispatchcloud/worker/pool.go
+++ b/lib/dispatchcloud/worker/pool.go
@@ -82,6 +82,9 @@ const (
// instances have been shutdown.
quotaErrorTTL = time.Minute
+ // Time after a capacity error to try again
+ capacityErrorTTL = time.Minute
+
// Time between "X failed because rate limiting" messages
logRateLimitErrorInterval = time.Second * 10
)
@@ -181,6 +184,7 @@ type Pool struct {
atQuotaUntilFewerInstances int
atQuotaUntil time.Time
atQuotaErr cloud.QuotaError
+ atCapacityUntil map[string]time.Time
stop chan bool
mtx sync.RWMutex
setupOnce sync.Once
@@ -320,14 +324,11 @@ func (wp *Pool) Create(it arvados.InstanceType) bool {
// Boot probe is certain to fail.
return false
}
- wp.mtx.Lock()
- defer wp.mtx.Unlock()
- if time.Now().Before(wp.atQuotaUntil) ||
- wp.atQuotaUntilFewerInstances > 0 ||
- wp.instanceSet.throttleCreate.Error() != nil ||
- (wp.maxInstances > 0 && wp.maxInstances <= len(wp.workers)+len(wp.creating)) {
+ if wp.AtCapacity(it) || wp.AtQuota() || wp.instanceSet.throttleCreate.Error() != nil {
return false
}
+ wp.mtx.Lock()
+ defer wp.mtx.Unlock()
// The maxConcurrentInstanceCreateOps knob throttles the number of node create
// requests in flight. It was added to work around a limitation in Azure's
// managed disks, which support no more than 20 concurrent node creation
@@ -381,6 +382,18 @@ func (wp *Pool) Create(it arvados.InstanceType) bool {
logger.WithField("atQuotaUntilFewerInstances", n).Info("quota error -- waiting for next instance shutdown")
}
}
+ if err, ok := err.(cloud.CapacityError); ok && err.IsCapacityError() {
+ capKey := it.ProviderType
+ if !err.IsInstanceTypeSpecific() {
+ // set capacity flag for all
+ // instance types
+ capKey = ""
+ }
+ if wp.atCapacityUntil == nil {
+ wp.atCapacityUntil = map[string]time.Time{}
+ }
+ wp.atCapacityUntil[capKey] = time.Now().Add(capacityErrorTTL)
+ }
logger.WithError(err).Error("create failed")
wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
return
@@ -393,6 +406,22 @@ func (wp *Pool) Create(it arvados.InstanceType) bool {
return true
}
+// AtCapacity returns true if Create() is currently expected to fail
+// for the given instance type.
+func (wp *Pool) AtCapacity(it arvados.InstanceType) bool {
+ wp.mtx.Lock()
+ defer wp.mtx.Unlock()
+ if t, ok := wp.atCapacityUntil[it.ProviderType]; ok && time.Now().Before(t) {
+ // at capacity for this instance type
+ return true
+ }
+ if t, ok := wp.atCapacityUntil[""]; ok && time.Now().Before(t) {
+ // at capacity for all instance types
+ return true
+ }
+ return false
+}
+
// AtQuota returns true if Create is not expected to work at the
// moment (e.g., cloud provider has reported quota errors, or we are
// already at our own configured quota).
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list