[arvados] created: 2.6.0-276-g69148e44f
git repository hosting
git at public.arvados.org
Thu Jun 15 19:54:27 UTC 2023
at 69148e44f11b50f1c4b1c1bc4c1871ab10c3e893 (commit)
commit 69148e44f11b50f1c4b1c1bc4c1871ab10c3e893
Author: Tom Clegg <tom at curii.com>
Date: Thu Jun 15 15:53:34 2023 -0400
20601: MaxSupervisorFraction is relative to maxConcurrency.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/dispatchcloud/dispatcher.go b/lib/dispatchcloud/dispatcher.go
index e3b4b251d..97cbd8edc 100644
--- a/lib/dispatchcloud/dispatcher.go
+++ b/lib/dispatchcloud/dispatcher.go
@@ -198,11 +198,7 @@ func (disp *dispatcher) run() {
if pollInterval <= 0 {
pollInterval = defaultPollInterval
}
- maxSupervisors := int(float64(disp.Cluster.Containers.CloudVMs.MaxInstances) * disp.Cluster.Containers.CloudVMs.SupervisorFraction)
- if maxSupervisors == 0 && disp.Cluster.Containers.CloudVMs.SupervisorFraction > 0 {
- maxSupervisors = 1
- }
- sched := scheduler.New(disp.Context, disp.ArvClient, disp.queue, disp.pool, disp.Registry, staleLockTimeout, pollInterval, maxSupervisors)
+ sched := scheduler.New(disp.Context, disp.ArvClient, disp.queue, disp.pool, disp.Registry, staleLockTimeout, pollInterval, disp.Cluster.Containers.CloudVMs.MaxInstances, disp.Cluster.Containers.CloudVMs.SupervisorFraction)
sched.Start()
defer sched.Stop()
diff --git a/lib/dispatchcloud/scheduler/run_queue.go b/lib/dispatchcloud/scheduler/run_queue.go
index dcb348878..e385e758f 100644
--- a/lib/dispatchcloud/scheduler/run_queue.go
+++ b/lib/dispatchcloud/scheduler/run_queue.go
@@ -89,8 +89,16 @@ func (sch *Scheduler) runQueue() {
} else {
sch.mLast503Time.Set(float64(sch.last503time.Unix()))
}
+ if sch.maxInstances > 0 && sch.maxConcurrency > sch.maxInstances {
+ sch.maxConcurrency = sch.maxInstances
+ }
sch.mMaxContainerConcurrency.Set(float64(sch.maxConcurrency))
+ maxSupervisors := int(float64(sch.maxConcurrency) * sch.maxSupervisors)
+ if maxSupervisors < 1 && sch.maxSupervisors > 0 && sch.maxConcurrency > 0 {
+ maxSupervisors = 1
+ }
+
sch.logger.WithFields(logrus.Fields{
"Containers": len(sorted),
"Processes": len(running),
@@ -118,7 +126,7 @@ tryrun:
})
if ctr.SchedulingParameters.Supervisor {
supervisors += 1
- if sch.maxSupervisors > 0 && supervisors > sch.maxSupervisors {
+ if maxSupervisors > 0 && supervisors > maxSupervisors {
overmaxsuper = append(overmaxsuper, sorted[i])
continue
}
diff --git a/lib/dispatchcloud/scheduler/run_queue_test.go b/lib/dispatchcloud/scheduler/run_queue_test.go
index 73602f810..60917a059 100644
--- a/lib/dispatchcloud/scheduler/run_queue_test.go
+++ b/lib/dispatchcloud/scheduler/run_queue_test.go
@@ -195,7 +195,7 @@ func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) {
running: map[string]time.Time{},
canCreate: 0,
}
- New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0).runQueue()
+ New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0).runQueue()
c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(1), test.InstanceType(1), test.InstanceType(1)})
c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4)})
c.Check(pool.running, check.HasLen, 1)
@@ -247,7 +247,7 @@ func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) {
starts: []string{},
canCreate: 0,
}
- sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0)
+ sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0)
sch.sync()
sch.runQueue()
sch.sync()
@@ -361,7 +361,7 @@ func (*SchedulerSuite) TestIdleIn503QuietPeriod(c *check.C) {
starts: []string{},
canCreate: 0,
}
- sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0)
+ sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0)
sch.last503time = time.Now()
sch.maxConcurrency = 3
sch.sync()
@@ -416,7 +416,7 @@ func (*SchedulerSuite) TestUnlockExcessSupervisors(c *check.C) {
starts: []string{},
canCreate: 0,
}
- sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 4)
+ sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 8, 0.5)
sch.sync()
sch.runQueue()
sch.sync()
@@ -475,7 +475,7 @@ func (*SchedulerSuite) TestExcessSupervisors(c *check.C) {
starts: []string{},
canCreate: 0,
}
- sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 4)
+ sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 8, 0.5)
sch.sync()
sch.runQueue()
sch.sync()
@@ -526,7 +526,7 @@ func (*SchedulerSuite) TestEqualPriorityContainers(c *check.C) {
starts: []string{},
canCreate: 0,
}
- sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0)
+ sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0)
for i := 0; i < 30; i++ {
sch.runQueue()
sch.sync()
@@ -628,7 +628,7 @@ func (*SchedulerSuite) TestStartWhileCreating(c *check.C) {
},
}
queue.Update()
- New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0).runQueue()
+ New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0).runQueue()
c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(2), test.InstanceType(1)})
c.Check(pool.starts, check.DeepEquals, []string{uuids[6], uuids[5], uuids[3], uuids[2]})
running := map[string]bool{}
@@ -672,7 +672,7 @@ func (*SchedulerSuite) TestKillNonexistentContainer(c *check.C) {
},
}
queue.Update()
- sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0)
+ sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0)
c.Check(pool.running, check.HasLen, 1)
sch.sync()
for deadline := time.Now().Add(time.Second); len(pool.Running()) > 0 && time.Now().Before(deadline); time.Sleep(time.Millisecond) {
@@ -705,7 +705,7 @@ func (*SchedulerSuite) TestContainersMetrics(c *check.C) {
pool := stubPool{
unalloc: map[arvados.InstanceType]int{test.InstanceType(1): 1},
}
- sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0)
+ sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0)
sch.runQueue()
sch.updateMetrics()
@@ -717,7 +717,7 @@ func (*SchedulerSuite) TestContainersMetrics(c *check.C) {
// 'over quota' metric will be 1 because no workers are available and canCreate defaults
// to zero.
pool = stubPool{}
- sch = New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0)
+ sch = New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0)
sch.runQueue()
sch.updateMetrics()
@@ -750,7 +750,7 @@ func (*SchedulerSuite) TestContainersMetrics(c *check.C) {
unalloc: map[arvados.InstanceType]int{test.InstanceType(1): 1},
running: map[string]time.Time{},
}
- sch = New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0)
+ sch = New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0)
sch.runQueue()
sch.updateMetrics()
@@ -824,7 +824,7 @@ func (*SchedulerSuite) TestSkipSupervisors(c *check.C) {
running: map[string]time.Time{},
canCreate: 0,
}
- New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 2).runQueue()
+ New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 10, 0.2).runQueue()
c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType(nil))
c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4), test.ContainerUUID(3), test.ContainerUUID(1)})
}
diff --git a/lib/dispatchcloud/scheduler/scheduler.go b/lib/dispatchcloud/scheduler/scheduler.go
index 892f2f3ca..70b352d64 100644
--- a/lib/dispatchcloud/scheduler/scheduler.go
+++ b/lib/dispatchcloud/scheduler/scheduler.go
@@ -48,7 +48,8 @@ type Scheduler struct {
last503time time.Time // last time API responded 503
maxConcurrency int // dynamic container limit (0 = unlimited), see runQueue()
- maxSupervisors int // maximum number of "supervisor" containers (these are containers who's main job is to launch other containers, e.g. workflow runners)
+ maxSupervisors float64 // maximum fraction of "supervisor" containers (these are containers who's main job is to launch other containers, e.g. workflow runners)
+ maxInstances int // maximum number of instances the pool will bring up (0 = unlimited)
mContainersAllocatedNotStarted prometheus.Gauge
mContainersNotAllocatedOverQuota prometheus.Gauge
@@ -61,7 +62,7 @@ type Scheduler struct {
//
// Any given queue and pool should not be used by more than one
// scheduler at a time.
-func New(ctx context.Context, client *arvados.Client, queue ContainerQueue, pool WorkerPool, reg *prometheus.Registry, staleLockTimeout, queueUpdateInterval time.Duration, maxSupervisors int) *Scheduler {
+func New(ctx context.Context, client *arvados.Client, queue ContainerQueue, pool WorkerPool, reg *prometheus.Registry, staleLockTimeout, queueUpdateInterval time.Duration, maxInstances int, maxSupervisors float64) *Scheduler {
sch := &Scheduler{
logger: ctxlog.FromContext(ctx),
client: client,
@@ -74,7 +75,9 @@ func New(ctx context.Context, client *arvados.Client, queue ContainerQueue, pool
stop: make(chan struct{}),
stopped: make(chan struct{}),
uuidOp: map[string]string{},
+ maxConcurrency: maxInstances, // initial value -- will be dynamically adjusted
maxSupervisors: maxSupervisors,
+ maxInstances: maxInstances,
}
sch.registerMetrics(reg)
return sch
diff --git a/lib/dispatchcloud/scheduler/sync_test.go b/lib/dispatchcloud/scheduler/sync_test.go
index 788d94648..1fc56cb8e 100644
--- a/lib/dispatchcloud/scheduler/sync_test.go
+++ b/lib/dispatchcloud/scheduler/sync_test.go
@@ -48,7 +48,7 @@ func (*SchedulerSuite) TestForgetIrrelevantContainers(c *check.C) {
ents, _ := queue.Entries()
c.Check(ents, check.HasLen, 1)
- sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0)
+ sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0)
sch.sync()
ents, _ = queue.Entries()
@@ -80,7 +80,7 @@ func (*SchedulerSuite) TestCancelOrphanedContainers(c *check.C) {
ents, _ := queue.Entries()
c.Check(ents, check.HasLen, 1)
- sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0)
+ sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0)
// Sync shouldn't cancel the container because it might be
// running on the VM with state=="unknown".
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list