[arvados] created: 2.6.0-276-g69148e44f

git repository hosting git at public.arvados.org
Thu Jun 15 19:54:27 UTC 2023


        at  69148e44f11b50f1c4b1c1bc4c1871ab10c3e893 (commit)


commit 69148e44f11b50f1c4b1c1bc4c1871ab10c3e893
Author: Tom Clegg <tom at curii.com>
Date:   Thu Jun 15 15:53:34 2023 -0400

    20601: MaxSupervisorFraction is relative to maxConcurrency.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/lib/dispatchcloud/dispatcher.go b/lib/dispatchcloud/dispatcher.go
index e3b4b251d..97cbd8edc 100644
--- a/lib/dispatchcloud/dispatcher.go
+++ b/lib/dispatchcloud/dispatcher.go
@@ -198,11 +198,7 @@ func (disp *dispatcher) run() {
 	if pollInterval <= 0 {
 		pollInterval = defaultPollInterval
 	}
-	maxSupervisors := int(float64(disp.Cluster.Containers.CloudVMs.MaxInstances) * disp.Cluster.Containers.CloudVMs.SupervisorFraction)
-	if maxSupervisors == 0 && disp.Cluster.Containers.CloudVMs.SupervisorFraction > 0 {
-		maxSupervisors = 1
-	}
-	sched := scheduler.New(disp.Context, disp.ArvClient, disp.queue, disp.pool, disp.Registry, staleLockTimeout, pollInterval, maxSupervisors)
+	sched := scheduler.New(disp.Context, disp.ArvClient, disp.queue, disp.pool, disp.Registry, staleLockTimeout, pollInterval, disp.Cluster.Containers.CloudVMs.MaxInstances, disp.Cluster.Containers.CloudVMs.SupervisorFraction)
 	sched.Start()
 	defer sched.Stop()
 
diff --git a/lib/dispatchcloud/scheduler/run_queue.go b/lib/dispatchcloud/scheduler/run_queue.go
index dcb348878..e385e758f 100644
--- a/lib/dispatchcloud/scheduler/run_queue.go
+++ b/lib/dispatchcloud/scheduler/run_queue.go
@@ -89,8 +89,16 @@ func (sch *Scheduler) runQueue() {
 	} else {
 		sch.mLast503Time.Set(float64(sch.last503time.Unix()))
 	}
+	if sch.maxInstances > 0 && sch.maxConcurrency > sch.maxInstances {
+		sch.maxConcurrency = sch.maxInstances
+	}
 	sch.mMaxContainerConcurrency.Set(float64(sch.maxConcurrency))
 
+	maxSupervisors := int(float64(sch.maxConcurrency) * sch.maxSupervisors)
+	if maxSupervisors < 1 && sch.maxSupervisors > 0 && sch.maxConcurrency > 0 {
+		maxSupervisors = 1
+	}
+
 	sch.logger.WithFields(logrus.Fields{
 		"Containers":     len(sorted),
 		"Processes":      len(running),
@@ -118,7 +126,7 @@ tryrun:
 		})
 		if ctr.SchedulingParameters.Supervisor {
 			supervisors += 1
-			if sch.maxSupervisors > 0 && supervisors > sch.maxSupervisors {
+			if maxSupervisors > 0 && supervisors > maxSupervisors {
 				overmaxsuper = append(overmaxsuper, sorted[i])
 				continue
 			}
diff --git a/lib/dispatchcloud/scheduler/run_queue_test.go b/lib/dispatchcloud/scheduler/run_queue_test.go
index 73602f810..60917a059 100644
--- a/lib/dispatchcloud/scheduler/run_queue_test.go
+++ b/lib/dispatchcloud/scheduler/run_queue_test.go
@@ -195,7 +195,7 @@ func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) {
 		running:   map[string]time.Time{},
 		canCreate: 0,
 	}
-	New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0).runQueue()
+	New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0).runQueue()
 	c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(1), test.InstanceType(1), test.InstanceType(1)})
 	c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4)})
 	c.Check(pool.running, check.HasLen, 1)
@@ -247,7 +247,7 @@ func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) {
 			starts:    []string{},
 			canCreate: 0,
 		}
-		sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0)
+		sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0)
 		sch.sync()
 		sch.runQueue()
 		sch.sync()
@@ -361,7 +361,7 @@ func (*SchedulerSuite) TestIdleIn503QuietPeriod(c *check.C) {
 		starts:    []string{},
 		canCreate: 0,
 	}
-	sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0)
+	sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0)
 	sch.last503time = time.Now()
 	sch.maxConcurrency = 3
 	sch.sync()
@@ -416,7 +416,7 @@ func (*SchedulerSuite) TestUnlockExcessSupervisors(c *check.C) {
 		starts:    []string{},
 		canCreate: 0,
 	}
-	sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 4)
+	sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 8, 0.5)
 	sch.sync()
 	sch.runQueue()
 	sch.sync()
@@ -475,7 +475,7 @@ func (*SchedulerSuite) TestExcessSupervisors(c *check.C) {
 		starts:    []string{},
 		canCreate: 0,
 	}
-	sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 4)
+	sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 8, 0.5)
 	sch.sync()
 	sch.runQueue()
 	sch.sync()
@@ -526,7 +526,7 @@ func (*SchedulerSuite) TestEqualPriorityContainers(c *check.C) {
 		starts:    []string{},
 		canCreate: 0,
 	}
-	sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0)
+	sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0)
 	for i := 0; i < 30; i++ {
 		sch.runQueue()
 		sch.sync()
@@ -628,7 +628,7 @@ func (*SchedulerSuite) TestStartWhileCreating(c *check.C) {
 		},
 	}
 	queue.Update()
-	New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0).runQueue()
+	New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0).runQueue()
 	c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(2), test.InstanceType(1)})
 	c.Check(pool.starts, check.DeepEquals, []string{uuids[6], uuids[5], uuids[3], uuids[2]})
 	running := map[string]bool{}
@@ -672,7 +672,7 @@ func (*SchedulerSuite) TestKillNonexistentContainer(c *check.C) {
 		},
 	}
 	queue.Update()
-	sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0)
+	sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0)
 	c.Check(pool.running, check.HasLen, 1)
 	sch.sync()
 	for deadline := time.Now().Add(time.Second); len(pool.Running()) > 0 && time.Now().Before(deadline); time.Sleep(time.Millisecond) {
@@ -705,7 +705,7 @@ func (*SchedulerSuite) TestContainersMetrics(c *check.C) {
 	pool := stubPool{
 		unalloc: map[arvados.InstanceType]int{test.InstanceType(1): 1},
 	}
-	sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0)
+	sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0)
 	sch.runQueue()
 	sch.updateMetrics()
 
@@ -717,7 +717,7 @@ func (*SchedulerSuite) TestContainersMetrics(c *check.C) {
 	// 'over quota' metric will be 1 because no workers are available and canCreate defaults
 	// to zero.
 	pool = stubPool{}
-	sch = New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0)
+	sch = New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0)
 	sch.runQueue()
 	sch.updateMetrics()
 
@@ -750,7 +750,7 @@ func (*SchedulerSuite) TestContainersMetrics(c *check.C) {
 		unalloc: map[arvados.InstanceType]int{test.InstanceType(1): 1},
 		running: map[string]time.Time{},
 	}
-	sch = New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0)
+	sch = New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0)
 	sch.runQueue()
 	sch.updateMetrics()
 
@@ -824,7 +824,7 @@ func (*SchedulerSuite) TestSkipSupervisors(c *check.C) {
 		running:   map[string]time.Time{},
 		canCreate: 0,
 	}
-	New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 2).runQueue()
+	New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 10, 0.2).runQueue()
 	c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType(nil))
 	c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4), test.ContainerUUID(3), test.ContainerUUID(1)})
 }
diff --git a/lib/dispatchcloud/scheduler/scheduler.go b/lib/dispatchcloud/scheduler/scheduler.go
index 892f2f3ca..70b352d64 100644
--- a/lib/dispatchcloud/scheduler/scheduler.go
+++ b/lib/dispatchcloud/scheduler/scheduler.go
@@ -48,7 +48,8 @@ type Scheduler struct {
 
 	last503time    time.Time // last time API responded 503
 	maxConcurrency int       // dynamic container limit (0 = unlimited), see runQueue()
-	maxSupervisors int       // maximum number of "supervisor" containers (these are containers who's main job is to launch other containers, e.g. workflow runners)
+	maxSupervisors float64   // maximum fraction of "supervisor" containers (these are containers who's main job is to launch other containers, e.g. workflow runners)
+	maxInstances   int       // maximum number of instances the pool will bring up (0 = unlimited)
 
 	mContainersAllocatedNotStarted   prometheus.Gauge
 	mContainersNotAllocatedOverQuota prometheus.Gauge
@@ -61,7 +62,7 @@ type Scheduler struct {
 //
 // Any given queue and pool should not be used by more than one
 // scheduler at a time.
-func New(ctx context.Context, client *arvados.Client, queue ContainerQueue, pool WorkerPool, reg *prometheus.Registry, staleLockTimeout, queueUpdateInterval time.Duration, maxSupervisors int) *Scheduler {
+func New(ctx context.Context, client *arvados.Client, queue ContainerQueue, pool WorkerPool, reg *prometheus.Registry, staleLockTimeout, queueUpdateInterval time.Duration, maxInstances int, maxSupervisors float64) *Scheduler {
 	sch := &Scheduler{
 		logger:              ctxlog.FromContext(ctx),
 		client:              client,
@@ -74,7 +75,9 @@ func New(ctx context.Context, client *arvados.Client, queue ContainerQueue, pool
 		stop:                make(chan struct{}),
 		stopped:             make(chan struct{}),
 		uuidOp:              map[string]string{},
+		maxConcurrency:      maxInstances, // initial value -- will be dynamically adjusted
 		maxSupervisors:      maxSupervisors,
+		maxInstances:        maxInstances,
 	}
 	sch.registerMetrics(reg)
 	return sch
diff --git a/lib/dispatchcloud/scheduler/sync_test.go b/lib/dispatchcloud/scheduler/sync_test.go
index 788d94648..1fc56cb8e 100644
--- a/lib/dispatchcloud/scheduler/sync_test.go
+++ b/lib/dispatchcloud/scheduler/sync_test.go
@@ -48,7 +48,7 @@ func (*SchedulerSuite) TestForgetIrrelevantContainers(c *check.C) {
 	ents, _ := queue.Entries()
 	c.Check(ents, check.HasLen, 1)
 
-	sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0)
+	sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0)
 	sch.sync()
 
 	ents, _ = queue.Entries()
@@ -80,7 +80,7 @@ func (*SchedulerSuite) TestCancelOrphanedContainers(c *check.C) {
 	ents, _ := queue.Entries()
 	c.Check(ents, check.HasLen, 1)
 
-	sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0)
+	sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0)
 
 	// Sync shouldn't cancel the container because it might be
 	// running on the VM with state=="unknown".

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list