[arvados] created: 2.5.0-210-g4447b5a61

git repository hosting git at public.arvados.org
Wed Mar 1 22:03:21 UTC 2023


        at  4447b5a61f79edf2411ba94f4ad5d90e1ca7e220 (commit)


commit 4447b5a61f79edf2411ba94f4ad5d90e1ca7e220
Author: Peter Amstutz <peter.amstutz at curii.com>
Date:   Wed Mar 1 17:01:55 2023 -0500

    20182: Add option to limit the number of supervisor containers
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>

diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml
index d19135634..882ee62c3 100644
--- a/lib/config/config.default.yml
+++ b/lib/config/config.default.yml
@@ -1055,6 +1055,16 @@ Clusters:
       # Container runtime: "docker" (default) or "singularity"
       RuntimeEngine: docker
 
+      # Number of "supervisor" containers eligible to run at any given
+      # time expressed as a fraction of CloudVMs.MaxInstances. A
+      # supervisor is a container who's purpose is to submit other
+      # containers, such as a container running arvados-cwl-runner.
+      # If there is a hard limit on the amount of concurrent
+      # containers that the cluster can run, it is important to avoid
+      # crowding out the containers doing useful work with containers
+      # who just create more work.
+      SupervisorFraction: 0.3
+
       # When running a container, run a dedicated keepstore process,
       # using the specified number of 64 MiB memory buffers per
       # allocated CPU core (VCPUs in the container's runtime
diff --git a/lib/config/export.go b/lib/config/export.go
index a8a535eeb..fc688d68d 100644
--- a/lib/config/export.go
+++ b/lib/config/export.go
@@ -142,6 +142,7 @@ var whitelist = map[string]bool{
 	"Containers.ShellAccess.User":              true,
 	"Containers.SLURM":                         false,
 	"Containers.StaleLockTimeout":              false,
+	"Containers.SupervisorFraction":            false,
 	"Containers.SupportedDockerImageFormats":   true,
 	"Containers.SupportedDockerImageFormats.*": true,
 	"Git":                                  false,
diff --git a/lib/dispatchcloud/dispatcher.go b/lib/dispatchcloud/dispatcher.go
index 270c6d43d..1f8272afd 100644
--- a/lib/dispatchcloud/dispatcher.go
+++ b/lib/dispatchcloud/dispatcher.go
@@ -193,7 +193,7 @@ func (disp *dispatcher) run() {
 	if pollInterval <= 0 {
 		pollInterval = defaultPollInterval
 	}
-	sched := scheduler.New(disp.Context, disp.ArvClient, disp.queue, disp.pool, disp.Registry, staleLockTimeout, pollInterval)
+	sched := scheduler.New(disp.Context, disp.ArvClient, disp.queue, disp.pool, disp.Registry, staleLockTimeout, pollInterval, int(float64(disp.Cluster.Containers.CloudVMs.MaxInstances)*disp.Cluster.Containers.SupervisorFraction))
 	sched.Start()
 	defer sched.Stop()
 
diff --git a/lib/dispatchcloud/scheduler/run_queue.go b/lib/dispatchcloud/scheduler/run_queue.go
index 1e5ac2e04..65ca904be 100644
--- a/lib/dispatchcloud/scheduler/run_queue.go
+++ b/lib/dispatchcloud/scheduler/run_queue.go
@@ -15,6 +15,10 @@ import (
 
 var quietAfter503 = time.Minute
 
+func isSupervisor(ctr arvados.Container) bool {
+	return (len(ctr.Command) > 0 && ctr.Command[0] == "arvados-cwl-runner") || ctr.SchedulingParameters.Supervisor
+}
+
 func (sch *Scheduler) runQueue() {
 	unsorted, _ := sch.queue.Entries()
 	sorted := make([]container.QueueEnt, 0, len(unsorted))
@@ -84,6 +88,8 @@ func (sch *Scheduler) runQueue() {
 	// reaches the dynamic maxConcurrency limit.
 	trying := len(running)
 
+	supervisors := 0
+
 tryrun:
 	for i, ctr := range sorted {
 		ctr, it := ctr.Container, ctr.InstanceType
@@ -91,6 +97,12 @@ tryrun:
 			"ContainerUUID": ctr.UUID,
 			"InstanceType":  it.Name,
 		})
+		if isSupervisor(ctr) {
+			supervisors += 1
+			if sch.maxSupervisors > 0 && supervisors > sch.maxSupervisors {
+				continue
+			}
+		}
 		if _, running := running[ctr.UUID]; running || ctr.Priority < 1 {
 			continue
 		}
diff --git a/lib/dispatchcloud/scheduler/run_queue_test.go b/lib/dispatchcloud/scheduler/run_queue_test.go
index 855399337..db161b7d7 100644
--- a/lib/dispatchcloud/scheduler/run_queue_test.go
+++ b/lib/dispatchcloud/scheduler/run_queue_test.go
@@ -195,7 +195,7 @@ func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) {
 		running:   map[string]time.Time{},
 		canCreate: 0,
 	}
-	New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond).runQueue()
+	New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0).runQueue()
 	c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(1), test.InstanceType(1), test.InstanceType(1)})
 	c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4)})
 	c.Check(pool.running, check.HasLen, 1)
@@ -247,7 +247,7 @@ func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) {
 			starts:    []string{},
 			canCreate: 0,
 		}
-		sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond)
+		sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0)
 		sch.sync()
 		sch.runQueue()
 		sch.sync()
@@ -318,7 +318,7 @@ func (*SchedulerSuite) TestEqualPriorityContainers(c *check.C) {
 		starts:    []string{},
 		canCreate: 0,
 	}
-	sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond)
+	sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0)
 	for i := 0; i < 30; i++ {
 		sch.runQueue()
 		sch.sync()
@@ -420,7 +420,7 @@ func (*SchedulerSuite) TestStartWhileCreating(c *check.C) {
 		},
 	}
 	queue.Update()
-	New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond).runQueue()
+	New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0).runQueue()
 	c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(2), test.InstanceType(1)})
 	c.Check(pool.starts, check.DeepEquals, []string{uuids[6], uuids[5], uuids[3], uuids[2]})
 	running := map[string]bool{}
@@ -464,7 +464,7 @@ func (*SchedulerSuite) TestKillNonexistentContainer(c *check.C) {
 		},
 	}
 	queue.Update()
-	sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond)
+	sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0)
 	c.Check(pool.running, check.HasLen, 1)
 	sch.sync()
 	for deadline := time.Now().Add(time.Second); len(pool.Running()) > 0 && time.Now().Before(deadline); time.Sleep(time.Millisecond) {
@@ -497,7 +497,7 @@ func (*SchedulerSuite) TestContainersMetrics(c *check.C) {
 	pool := stubPool{
 		unalloc: map[arvados.InstanceType]int{test.InstanceType(1): 1},
 	}
-	sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond)
+	sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0)
 	sch.runQueue()
 	sch.updateMetrics()
 
@@ -509,7 +509,7 @@ func (*SchedulerSuite) TestContainersMetrics(c *check.C) {
 	// 'over quota' metric will be 1 because no workers are available and canCreate defaults
 	// to zero.
 	pool = stubPool{}
-	sch = New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond)
+	sch = New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0)
 	sch.runQueue()
 	sch.updateMetrics()
 
@@ -542,9 +542,78 @@ func (*SchedulerSuite) TestContainersMetrics(c *check.C) {
 		unalloc: map[arvados.InstanceType]int{test.InstanceType(1): 1},
 		running: map[string]time.Time{},
 	}
-	sch = New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond)
+	sch = New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0)
 	sch.runQueue()
 	sch.updateMetrics()
 
 	c.Check(int(testutil.ToFloat64(sch.mLongestWaitTimeSinceQueue)), check.Equals, 0)
 }
+
+// Assign priority=4 container to idle node. Create new instances for
+// the priority=3 and 1 containers.  Ignore the supervisor at priority 2.
+func (*SchedulerSuite) TestSkipSupervisors(c *check.C) {
+	ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
+	queue := test.Queue{
+		ChooseType: chooseType,
+		Containers: []arvados.Container{
+			{
+				UUID:     test.ContainerUUID(1),
+				Priority: 1,
+				State:    arvados.ContainerStateLocked,
+				RuntimeConstraints: arvados.RuntimeConstraints{
+					VCPUs: 1,
+					RAM:   1 << 30,
+				},
+			},
+			{
+				UUID:     test.ContainerUUID(2),
+				Priority: 2,
+				State:    arvados.ContainerStateLocked,
+				RuntimeConstraints: arvados.RuntimeConstraints{
+					VCPUs: 1,
+					RAM:   1 << 30,
+				},
+				Command: []string{"arvados-cwl-runner"},
+			},
+			{
+				UUID:     test.ContainerUUID(3),
+				Priority: 3,
+				State:    arvados.ContainerStateLocked,
+				RuntimeConstraints: arvados.RuntimeConstraints{
+					VCPUs: 1,
+					RAM:   1 << 30,
+				},
+				SchedulingParameters: arvados.SchedulingParameters{
+					Supervisor: true,
+				},
+			},
+			{
+				UUID:     test.ContainerUUID(4),
+				Priority: 4,
+				State:    arvados.ContainerStateLocked,
+				RuntimeConstraints: arvados.RuntimeConstraints{
+					VCPUs: 1,
+					RAM:   1 << 30,
+				},
+				Command: []string{"arvados-cwl-runner"},
+			},
+		},
+	}
+	queue.Update()
+	pool := stubPool{
+		quota: 1000,
+		unalloc: map[arvados.InstanceType]int{
+			test.InstanceType(1): 4,
+			test.InstanceType(2): 4,
+		},
+		idle: map[arvados.InstanceType]int{
+			test.InstanceType(1): 4,
+			test.InstanceType(2): 4,
+		},
+		running:   map[string]time.Time{},
+		canCreate: 0,
+	}
+	New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 2).runQueue()
+	c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType(nil))
+	c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4), test.ContainerUUID(3), test.ContainerUUID(1)})
+}
diff --git a/lib/dispatchcloud/scheduler/scheduler.go b/lib/dispatchcloud/scheduler/scheduler.go
index 4644dc4ea..21510ee09 100644
--- a/lib/dispatchcloud/scheduler/scheduler.go
+++ b/lib/dispatchcloud/scheduler/scheduler.go
@@ -48,6 +48,7 @@ type Scheduler struct {
 
 	last503time    time.Time // last time API responded 503
 	maxConcurrency int       // dynamic container limit (0 = unlimited), see runQueue()
+	maxSupervisors int       // maximum number of "supervisor" containers (these are containers who's main job is to launch other containers, e.g. workflow runners)
 
 	mContainersAllocatedNotStarted   prometheus.Gauge
 	mContainersNotAllocatedOverQuota prometheus.Gauge
@@ -60,7 +61,7 @@ type Scheduler struct {
 //
 // Any given queue and pool should not be used by more than one
 // scheduler at a time.
-func New(ctx context.Context, client *arvados.Client, queue ContainerQueue, pool WorkerPool, reg *prometheus.Registry, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler {
+func New(ctx context.Context, client *arvados.Client, queue ContainerQueue, pool WorkerPool, reg *prometheus.Registry, staleLockTimeout, queueUpdateInterval time.Duration, maxSupervisors int) *Scheduler {
 	sch := &Scheduler{
 		logger:              ctxlog.FromContext(ctx),
 		client:              client,
@@ -73,6 +74,7 @@ func New(ctx context.Context, client *arvados.Client, queue ContainerQueue, pool
 		stop:                make(chan struct{}),
 		stopped:             make(chan struct{}),
 		uuidOp:              map[string]string{},
+		maxSupervisors:      maxSupervisors,
 	}
 	sch.registerMetrics(reg)
 	return sch
diff --git a/lib/dispatchcloud/scheduler/sync_test.go b/lib/dispatchcloud/scheduler/sync_test.go
index df254cd32..788d94648 100644
--- a/lib/dispatchcloud/scheduler/sync_test.go
+++ b/lib/dispatchcloud/scheduler/sync_test.go
@@ -48,7 +48,7 @@ func (*SchedulerSuite) TestForgetIrrelevantContainers(c *check.C) {
 	ents, _ := queue.Entries()
 	c.Check(ents, check.HasLen, 1)
 
-	sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond)
+	sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0)
 	sch.sync()
 
 	ents, _ = queue.Entries()
@@ -80,7 +80,7 @@ func (*SchedulerSuite) TestCancelOrphanedContainers(c *check.C) {
 	ents, _ := queue.Entries()
 	c.Check(ents, check.HasLen, 1)
 
-	sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond)
+	sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0)
 
 	// Sync shouldn't cancel the container because it might be
 	// running on the VM with state=="unknown".
diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go
index 9d133a4bc..d167addd3 100644
--- a/sdk/go/arvados/config.go
+++ b/sdk/go/arvados/config.go
@@ -506,6 +506,7 @@ type ContainersConfig struct {
 	SupportedDockerImageFormats   StringSet
 	AlwaysUsePreemptibleInstances bool
 	PreemptiblePriceFactor        float64
+	SupervisorFraction            float64
 	RuntimeEngine                 string
 	LocalKeepBlobBuffersPerVCPU   int
 	LocalKeepLogsToContainerLog   string
diff --git a/sdk/go/arvados/container.go b/sdk/go/arvados/container.go
index 165c8112e..7b31726aa 100644
--- a/sdk/go/arvados/container.go
+++ b/sdk/go/arvados/container.go
@@ -121,6 +121,7 @@ type SchedulingParameters struct {
 	Partitions  []string `json:"partitions"`
 	Preemptible bool     `json:"preemptible"`
 	MaxRunTime  int      `json:"max_run_time"`
+	Supervisor  bool     `json:"supervisor"`
 }
 
 // ContainerList is an arvados#containerList resource.

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list