[arvados] created: 2.7.0-4998-gc29c2feb69

git repository hosting git at public.arvados.org
Mon Oct 16 19:07:45 UTC 2023


        at  c29c2feb691b5abbffaf8f177222d2973ad1b19f (commit)


commit c29c2feb691b5abbffaf8f177222d2973ad1b19f
Author: Tom Clegg <tom at curii.com>
Date:   Mon Oct 16 15:04:36 2023 -0400

    20984: Handle "instance type not available" condition better.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/lib/cloud/ec2/ec2.go b/lib/cloud/ec2/ec2.go
index 5e4df05f46..816df48d90 100644
--- a/lib/cloud/ec2/ec2.go
+++ b/lib/cloud/ec2/ec2.go
@@ -665,21 +665,36 @@ func (err rateLimitError) EarliestRetry() time.Time {
 	return err.earliestRetry
 }
 
-var isCodeCapacity = map[string]bool{
+type capacityError struct {
+	error
+	isInstanceTypeSpecific bool
+}
+
+func (er *capacityError) IsCapacityError() bool {
+	return true
+}
+
+func (er *capacityError) IsInstanceTypeSpecific() bool {
+	return er.isInstanceTypeSpecific
+}
+
+var isCodeQuota = map[string]bool{
 	"InstanceLimitExceeded":             true,
 	"InsufficientAddressCapacity":       true,
 	"InsufficientFreeAddressesInSubnet": true,
-	"InsufficientInstanceCapacity":      true,
 	"InsufficientVolumeCapacity":        true,
 	"MaxSpotInstanceCountExceeded":      true,
 	"VcpuLimitExceeded":                 true,
 }
 
-// isErrorCapacity returns whether the error is to be throttled based on its code.
+// isErrorQuota returns whether the error indicates we have reached
+// some usage quota/limit -- i.e., immediately retrying with an equal
+// or larger instance type will probably not work.
+//
 // Returns false if error is nil.
-func isErrorCapacity(err error) bool {
+func isErrorQuota(err error) bool {
 	if aerr, ok := err.(awserr.Error); ok && aerr != nil {
-		if _, ok := isCodeCapacity[aerr.Code()]; ok {
+		if _, ok := isCodeQuota[aerr.Code()]; ok {
 			return true
 		}
 	}
@@ -720,8 +735,10 @@ func wrapError(err error, throttleValue *atomic.Value) error {
 		}
 		throttleValue.Store(d)
 		return rateLimitError{error: err, earliestRetry: time.Now().Add(d)}
-	} else if isErrorCapacity(err) {
+	} else if isErrorQuota(err) {
 		return &ec2QuotaError{err}
+	} else if aerr, ok := err.(awserr.Error); ok && aerr != nil && aerr.Code() == "InsufficientInstanceCapacity" {
+		return &capacityError{err, true}
 	} else if err != nil {
 		throttleValue.Store(time.Duration(0))
 		return err
diff --git a/lib/cloud/ec2/ec2_test.go b/lib/cloud/ec2/ec2_test.go
index 6fde4bbbca..a57fcebf76 100644
--- a/lib/cloud/ec2/ec2_test.go
+++ b/lib/cloud/ec2/ec2_test.go
@@ -508,8 +508,15 @@ func (*EC2InstanceSetSuite) TestWrapError(c *check.C) {
 	_, ok := wrapped.(cloud.RateLimitError)
 	c.Check(ok, check.Equals, true)
 
-	quotaError := awserr.New("InsufficientInstanceCapacity", "", nil)
+	quotaError := awserr.New("InstanceLimitExceeded", "", nil)
 	wrapped = wrapError(quotaError, nil)
 	_, ok = wrapped.(cloud.QuotaError)
 	c.Check(ok, check.Equals, true)
+
+	capacityError := awserr.New("InsufficientInstanceCapacity", "", nil)
+	wrapped = wrapError(capacityError, nil)
+	caperr, ok := wrapped.(cloud.CapacityError)
+	c.Check(ok, check.Equals, true)
+	c.Check(caperr.IsCapacityError(), check.Equals, true)
+	c.Check(caperr.IsInstanceTypeSpecific(), check.Equals, true)
 }
diff --git a/lib/cloud/interfaces.go b/lib/cloud/interfaces.go
index 7c532fda4a..a2aa9e1432 100644
--- a/lib/cloud/interfaces.go
+++ b/lib/cloud/interfaces.go
@@ -37,6 +37,20 @@ type QuotaError interface {
 	error
 }
 
+// A CapacityError should be returned by an InstanceSet's Create
+// method when the cloud service indicates it has insufficient
+// capacity to create new instances -- i.e., we shouldn't retry right
+// away.
+type CapacityError interface {
+	// If true, wait before trying to create more instances.
+	IsCapacityError() bool
+	// If true, the condition is specific to the requested
+	// instance types.  Wait before trying to create more
+	// instances of that same type.
+	IsInstanceTypeSpecific() bool
+	error
+}
+
 type SharedResourceTags map[string]string
 type InstanceSetID string
 type InstanceTags map[string]string
diff --git a/lib/dispatchcloud/scheduler/interfaces.go b/lib/dispatchcloud/scheduler/interfaces.go
index 78f8c804e2..6e56bd8c40 100644
--- a/lib/dispatchcloud/scheduler/interfaces.go
+++ b/lib/dispatchcloud/scheduler/interfaces.go
@@ -34,6 +34,7 @@ type WorkerPool interface {
 	Running() map[string]time.Time
 	Unallocated() map[arvados.InstanceType]int
 	CountWorkers() map[worker.State]int
+	AtCapacity(arvados.InstanceType) bool
 	AtQuota() bool
 	Create(arvados.InstanceType) bool
 	Shutdown(arvados.InstanceType) bool
diff --git a/lib/dispatchcloud/scheduler/run_queue.go b/lib/dispatchcloud/scheduler/run_queue.go
index 6a717bf444..3505c3e064 100644
--- a/lib/dispatchcloud/scheduler/run_queue.go
+++ b/lib/dispatchcloud/scheduler/run_queue.go
@@ -149,6 +149,7 @@ func (sch *Scheduler) runQueue() {
 	}).Debug("runQueue")
 
 	dontstart := map[arvados.InstanceType]bool{}
+	var atcapacity = map[string]bool{}    // ProviderTypes reported as AtCapacity during this runQueue() invocation
 	var overquota []container.QueueEnt    // entries that are unmappable because of worker pool quota
 	var overmaxsuper []container.QueueEnt // unmappable because max supervisors (these are not included in overquota)
 	var containerAllocatedWorkerBootingCount int
@@ -189,6 +190,11 @@ tryrun:
 				overquota = sorted[i:]
 				break tryrun
 			}
+			if unalloc[it] < 1 && (atcapacity[it.ProviderType] || sch.pool.AtCapacity(it)) {
+				logger.Trace("not locking: AtCapacity and no unalloc workers")
+				atcapacity[it.ProviderType] = true
+				continue
+			}
 			if sch.pool.KillContainer(ctr.UUID, "about to lock") {
 				logger.Info("not locking: crunch-run process from previous attempt has not exited")
 				continue
@@ -211,6 +217,27 @@ tryrun:
 				logger.Trace("overquota")
 				overquota = sorted[i:]
 				break tryrun
+			} else if atcapacity[it.ProviderType] || sch.pool.AtCapacity(it) {
+				// Continue trying lower-priority
+				// containers in case they can run on
+				// different instance types that are
+				// available.
+				//
+				// The local "atcapacity" cache helps
+				// when the pool's flag resets after
+				// we look at container A but before
+				// we look at lower-priority container
+				// B. In that case we want to run
+				// container A on the next call to
+				// runQueue(), rather than run
+				// container B now.
+				//
+				// TODO: try running this container on
+				// a bigger (but not much more
+				// expensive) instance type.
+				logger.WithField("InstanceType", it.Name).Trace("at capacity")
+				atcapacity[it.ProviderType] = true
+				continue
 			} else if sch.pool.Create(it) {
 				// Success. (Note pool.Create works
 				// asynchronously and does its own
@@ -219,10 +246,7 @@ tryrun:
 				logger.Info("creating new instance")
 			} else {
 				// Failed despite not being at quota,
-				// e.g., cloud ops throttled.  TODO:
-				// avoid getting starved here if
-				// instances of a specific type always
-				// fail.
+				// e.g., cloud ops throttled.
 				logger.Trace("pool declined to create new instance")
 				continue
 			}
diff --git a/lib/dispatchcloud/scheduler/run_queue_test.go b/lib/dispatchcloud/scheduler/run_queue_test.go
index a8df944b4c..76eecead5e 100644
--- a/lib/dispatchcloud/scheduler/run_queue_test.go
+++ b/lib/dispatchcloud/scheduler/run_queue_test.go
@@ -32,10 +32,12 @@ var (
 type stubPool struct {
 	notify    <-chan struct{}
 	unalloc   map[arvados.InstanceType]int // idle+booting+unknown
+	busy      map[arvados.InstanceType]int
 	idle      map[arvados.InstanceType]int
 	unknown   map[arvados.InstanceType]int
 	running   map[string]time.Time
 	quota     int
+	capacity  map[string]int
 	canCreate int
 	creates   []arvados.InstanceType
 	starts    []string
@@ -55,6 +57,20 @@ func (p *stubPool) AtQuota() bool {
 	}
 	return n >= p.quota
 }
+func (p *stubPool) AtCapacity(it arvados.InstanceType) bool {
+	supply, ok := p.capacity[it.ProviderType]
+	if !ok {
+		return false
+	}
+	for _, existing := range []map[arvados.InstanceType]int{p.unalloc, p.busy} {
+		for eit, n := range existing {
+			if eit.ProviderType == it.ProviderType {
+				supply -= n
+			}
+		}
+	}
+	return supply < 1
+}
 func (p *stubPool) Subscribe() <-chan struct{}  { return p.notify }
 func (p *stubPool) Unsubscribe(<-chan struct{}) {}
 func (p *stubPool) Running() map[string]time.Time {
@@ -116,6 +132,7 @@ func (p *stubPool) StartContainer(it arvados.InstanceType, ctr arvados.Container
 	if p.idle[it] == 0 {
 		return false
 	}
+	p.busy[it]++
 	p.idle[it]--
 	p.unalloc[it]--
 	p.running[ctr.UUID] = time.Time{}
@@ -186,6 +203,7 @@ func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) {
 			test.InstanceType(1): 1,
 			test.InstanceType(2): 2,
 		},
+		busy:      map[arvados.InstanceType]int{},
 		running:   map[string]time.Time{},
 		canCreate: 0,
 	}
@@ -236,6 +254,7 @@ func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) {
 			idle: map[arvados.InstanceType]int{
 				test.InstanceType(2): 2,
 			},
+			busy:      map[arvados.InstanceType]int{},
 			running:   map[string]time.Time{},
 			creates:   []arvados.InstanceType{},
 			starts:    []string{},
@@ -272,6 +291,85 @@ func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) {
 	}
 }
 
+// If pool.AtCapacity(it) is true for one instance type, try running a
+// lower-priority container that uses a different node type.  Don't
+// lock/unlock/start any container that requires the affected instance
+// type.
+func (*SchedulerSuite) TestInstanceCapacity(c *check.C) {
+	ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
+
+	queue := test.Queue{
+		ChooseType: chooseType,
+		Containers: []arvados.Container{
+			{
+				UUID:     test.ContainerUUID(1),
+				Priority: 1,
+				State:    arvados.ContainerStateLocked,
+				RuntimeConstraints: arvados.RuntimeConstraints{
+					VCPUs: 1,
+					RAM:   1 << 30,
+				},
+			},
+			{
+				UUID:     test.ContainerUUID(2),
+				Priority: 2,
+				State:    arvados.ContainerStateQueued,
+				RuntimeConstraints: arvados.RuntimeConstraints{
+					VCPUs: 4,
+					RAM:   4 << 30,
+				},
+			},
+			{
+				UUID:     test.ContainerUUID(3),
+				Priority: 3,
+				State:    arvados.ContainerStateLocked,
+				RuntimeConstraints: arvados.RuntimeConstraints{
+					VCPUs: 4,
+					RAM:   4 << 30,
+				},
+			},
+			{
+				UUID:     test.ContainerUUID(4),
+				Priority: 4,
+				State:    arvados.ContainerStateLocked,
+				RuntimeConstraints: arvados.RuntimeConstraints{
+					VCPUs: 4,
+					RAM:   4 << 30,
+				},
+			},
+		},
+	}
+	queue.Update()
+	pool := stubPool{
+		quota:    99,
+		capacity: map[string]int{test.InstanceType(4).ProviderType: 1},
+		unalloc: map[arvados.InstanceType]int{
+			test.InstanceType(4): 1,
+		},
+		idle: map[arvados.InstanceType]int{
+			test.InstanceType(4): 1,
+		},
+		busy:      map[arvados.InstanceType]int{},
+		running:   map[string]time.Time{},
+		creates:   []arvados.InstanceType{},
+		starts:    []string{},
+		canCreate: 99,
+	}
+	sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0, 0)
+	sch.sync()
+	sch.runQueue()
+	sch.sync()
+
+	// Start container4, but then pool reports AtCapacity for
+	// type4, so we skip trying to create an instance for
+	// container3, but do try to create a type2 instance for
+	// container2.
+	c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4), test.ContainerUUID(1)})
+	c.Check(pool.shutdowns, check.Equals, 0)
+	c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(1)})
+	c.Check(queue.StateChanges(), check.HasLen, 0)
+}
+
 // Don't unlock containers or shutdown unalloc (booting/idle) nodes
 // just because some 503 errors caused us to reduce maxConcurrency
 // below the current load level.
@@ -347,6 +445,10 @@ func (*SchedulerSuite) TestIdleIn503QuietPeriod(c *check.C) {
 			test.InstanceType(2): 1,
 			test.InstanceType(3): 1,
 		},
+		busy: map[arvados.InstanceType]int{
+			test.InstanceType(2): 1,
+			test.InstanceType(3): 1,
+		},
 		running: map[string]time.Time{
 			test.ContainerUUID(1): {},
 			test.ContainerUUID(3): {},
@@ -400,6 +502,9 @@ func (*SchedulerSuite) TestUnlockExcessSupervisors(c *check.C) {
 		idle: map[arvados.InstanceType]int{
 			test.InstanceType(2): 1,
 		},
+		busy: map[arvados.InstanceType]int{
+			test.InstanceType(2): 4,
+		},
 		running: map[string]time.Time{
 			test.ContainerUUID(1): {},
 			test.ContainerUUID(2): {},
@@ -461,6 +566,9 @@ func (*SchedulerSuite) TestExcessSupervisors(c *check.C) {
 		idle: map[arvados.InstanceType]int{
 			test.InstanceType(2): 1,
 		},
+		busy: map[arvados.InstanceType]int{
+			test.InstanceType(2): 2,
+		},
 		running: map[string]time.Time{
 			test.ContainerUUID(5): {},
 			test.ContainerUUID(6): {},
@@ -515,6 +623,7 @@ func (*SchedulerSuite) TestEqualPriorityContainers(c *check.C) {
 		idle: map[arvados.InstanceType]int{
 			test.InstanceType(3): 2,
 		},
+		busy:      map[arvados.InstanceType]int{},
 		running:   map[string]time.Time{},
 		creates:   []arvados.InstanceType{},
 		starts:    []string{},
@@ -553,6 +662,7 @@ func (*SchedulerSuite) TestStartWhileCreating(c *check.C) {
 			test.InstanceType(1): 1,
 			test.InstanceType(2): 1,
 		},
+		busy:      map[arvados.InstanceType]int{},
 		running:   map[string]time.Time{},
 		canCreate: 4,
 	}
@@ -646,6 +756,9 @@ func (*SchedulerSuite) TestKillNonexistentContainer(c *check.C) {
 		idle: map[arvados.InstanceType]int{
 			test.InstanceType(2): 0,
 		},
+		busy: map[arvados.InstanceType]int{
+			test.InstanceType(2): 1,
+		},
 		running: map[string]time.Time{
 			test.ContainerUUID(2): {},
 		},
@@ -742,6 +855,7 @@ func (*SchedulerSuite) TestContainersMetrics(c *check.C) {
 	pool = stubPool{
 		idle:    map[arvados.InstanceType]int{test.InstanceType(1): 1},
 		unalloc: map[arvados.InstanceType]int{test.InstanceType(1): 1},
+		busy:    map[arvados.InstanceType]int{},
 		running: map[string]time.Time{},
 	}
 	sch = New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0, 0)
@@ -815,6 +929,7 @@ func (*SchedulerSuite) TestSkipSupervisors(c *check.C) {
 			test.InstanceType(1): 4,
 			test.InstanceType(2): 4,
 		},
+		busy:      map[arvados.InstanceType]int{},
 		running:   map[string]time.Time{},
 		canCreate: 0,
 	}
diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go
index 15b0dbcde5..fc9f5445d6 100644
--- a/lib/dispatchcloud/worker/pool.go
+++ b/lib/dispatchcloud/worker/pool.go
@@ -82,6 +82,9 @@ const (
 	// instances have been shutdown.
 	quotaErrorTTL = time.Minute
 
+	// Time after a capacity error to try again
+	capacityErrorTTL = time.Minute
+
 	// Time between "X failed because rate limiting" messages
 	logRateLimitErrorInterval = time.Second * 10
 )
@@ -181,6 +184,7 @@ type Pool struct {
 	atQuotaUntilFewerInstances int
 	atQuotaUntil               time.Time
 	atQuotaErr                 cloud.QuotaError
+	atCapacityUntil            map[string]time.Time
 	stop                       chan bool
 	mtx                        sync.RWMutex
 	setupOnce                  sync.Once
@@ -320,14 +324,11 @@ func (wp *Pool) Create(it arvados.InstanceType) bool {
 		// Boot probe is certain to fail.
 		return false
 	}
-	wp.mtx.Lock()
-	defer wp.mtx.Unlock()
-	if time.Now().Before(wp.atQuotaUntil) ||
-		wp.atQuotaUntilFewerInstances > 0 ||
-		wp.instanceSet.throttleCreate.Error() != nil ||
-		(wp.maxInstances > 0 && wp.maxInstances <= len(wp.workers)+len(wp.creating)) {
+	if wp.AtCapacity(it) || wp.AtQuota() || wp.instanceSet.throttleCreate.Error() != nil {
 		return false
 	}
+	wp.mtx.Lock()
+	defer wp.mtx.Unlock()
 	// The maxConcurrentInstanceCreateOps knob throttles the number of node create
 	// requests in flight. It was added to work around a limitation in Azure's
 	// managed disks, which support no more than 20 concurrent node creation
@@ -381,6 +382,18 @@ func (wp *Pool) Create(it arvados.InstanceType) bool {
 					logger.WithField("atQuotaUntilFewerInstances", n).Info("quota error -- waiting for next instance shutdown")
 				}
 			}
+			if err, ok := err.(cloud.CapacityError); ok && err.IsCapacityError() {
+				capKey := it.ProviderType
+				if !err.IsInstanceTypeSpecific() {
+					// set capacity flag for all
+					// instance types
+					capKey = ""
+				}
+				if wp.atCapacityUntil == nil {
+					wp.atCapacityUntil = map[string]time.Time{}
+				}
+				wp.atCapacityUntil[capKey] = time.Now().Add(capacityErrorTTL)
+			}
 			logger.WithError(err).Error("create failed")
 			wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
 			return
@@ -393,6 +406,22 @@ func (wp *Pool) Create(it arvados.InstanceType) bool {
 	return true
 }
 
+// AtCapacity returns true if Create() is currently expected to fail
+// for the given instance type.
+func (wp *Pool) AtCapacity(it arvados.InstanceType) bool {
+	wp.mtx.Lock()
+	defer wp.mtx.Unlock()
+	if t, ok := wp.atCapacityUntil[it.ProviderType]; ok && time.Now().Before(t) {
+		// at capacity for this instance type
+		return true
+	}
+	if t, ok := wp.atCapacityUntil[""]; ok && time.Now().Before(t) {
+		// at capacity for all instance types
+		return true
+	}
+	return false
+}
+
 // AtQuota returns true if Create is not expected to work at the
 // moment (e.g., cloud provider has reported quota errors, or we are
 // already at our own configured quota).

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list