[ARVADOS] updated: 1.1.3-69-g498d29a
Git user
git at public.curoverse.com
Fri Feb 16 02:29:32 EST 2018
Summary of changes:
sdk/go/arvados/container.go | 3 +
.../crunch-dispatch-slurm/crunch-dispatch-slurm.go | 54 ++++--------
.../crunch-dispatch-slurm_test.go | 34 ++------
services/crunch-dispatch-slurm/priority.go | 10 +--
services/crunch-dispatch-slurm/priority_test.go | 32 ++++----
services/crunch-dispatch-slurm/slurm.go | 4 +-
services/crunch-dispatch-slurm/squeue.go | 95 ++++++++++++++++------
7 files changed, 116 insertions(+), 116 deletions(-)
via 498d29adee40f671fd2924c410226db7a6a0ba93 (commit)
from da17cdccd11d66a10cbc3bf7fbd8c84b49d4a67c (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 498d29adee40f671fd2924c410226db7a6a0ba93
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Fri Feb 16 02:29:22 2018 -0500
12552: Adjust SLURM niceness to bring priority into desired order.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/sdk/go/arvados/container.go b/sdk/go/arvados/container.go
index 20d007c..daafc49 100644
--- a/sdk/go/arvados/container.go
+++ b/sdk/go/arvados/container.go
@@ -4,9 +4,12 @@
package arvados
+import "time"
+
// Container is an arvados#container resource.
type Container struct {
UUID string `json:"uuid"`
+ CreatedAt time.Time `json:"created_at"`
Command []string `json:"command"`
ContainerImage string `json:"container_image"`
Cwd string `json:"cwd"`
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index 0879de2..dab6025 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -25,6 +25,8 @@ import (
"github.com/coreos/go-systemd/daemon"
)
+const initialNiceValue int64 = 10000
+
var (
version = "dev"
defaultConfigPath = "/etc/arvados/crunch-dispatch-slurm/crunch-dispatch-slurm.yml"
@@ -40,6 +42,7 @@ type Dispatcher struct {
SbatchArguments []string
PollPeriod arvados.Duration
+ PrioritySpread int64
// crunch-run command to invoke. The container UUID will be
// appended. If nil, []string{"crunch-run"} will be used.
@@ -150,8 +153,9 @@ func (disp *Dispatcher) setup() {
disp.slurm = &slurmCLI{}
disp.sqCheck = &SqueueChecker{
- Period: time.Duration(disp.PollPeriod),
- Slurm: disp.slurm,
+ Period: time.Duration(disp.PollPeriod),
+ PrioritySpread: disp.PrioritySpread,
+ Slurm: disp.slurm,
}
disp.Dispatcher = &dispatch.Dispatcher{
Arv: arv,
@@ -194,17 +198,6 @@ func (disp *Dispatcher) checkSqueueForOrphans() {
}
}
-func (disp *Dispatcher) niceness(priority int) int {
- if priority > 1000 {
- priority = 1000
- }
- if priority < 0 {
- priority = 0
- }
- // Niceness range 1-10000
- return (1000 - priority) * 10
-}
-
func (disp *Dispatcher) sbatchArgs(container arvados.Container) ([]string, error) {
mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+container.RuntimeConstraints.KeepCacheRAM) / float64(1048576)))
@@ -222,7 +215,7 @@ func (disp *Dispatcher) sbatchArgs(container arvados.Container) ([]string, error
sbatchArgs = append(sbatchArgs, fmt.Sprintf("--mem=%d", mem))
sbatchArgs = append(sbatchArgs, fmt.Sprintf("--cpus-per-task=%d", container.RuntimeConstraints.VCPUs))
sbatchArgs = append(sbatchArgs, fmt.Sprintf("--tmp=%d", disk))
- sbatchArgs = append(sbatchArgs, fmt.Sprintf("--nice=%d", disp.niceness(container.Priority)))
+ sbatchArgs = append(sbatchArgs, fmt.Sprintf("--nice=%d", initialNiceValue))
if len(container.SchedulingParameters.Partitions) > 0 {
sbatchArgs = append(sbatchArgs, fmt.Sprintf("--partition=%s", strings.Join(container.SchedulingParameters.Partitions, ",")))
}
@@ -323,12 +316,19 @@ func (disp *Dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
log.Printf("container %s has state %q, priority %d: cancel slurm job", ctr.UUID, updated.State, updated.Priority)
disp.scancel(ctr)
} else {
- disp.renice(updated)
+ p := int64(updated.Priority)
+ if p <= 1000 {
+ // API is providing
+ // user-assigned priority. If
+ // ctrs have equal priority,
+ // run the older one first.
+ p = int64(p)<<50 - (updated.CreatedAt.UnixNano() >> 14)
+ }
+ disp.sqCheck.SetPriority(ctr.UUID, p)
}
}
}
}
-
func (disp *Dispatcher) scancel(ctr arvados.Container) {
disp.sqCheck.L.Lock()
err := disp.slurm.Cancel(ctr.UUID)
@@ -343,28 +343,6 @@ func (disp *Dispatcher) scancel(ctr arvados.Container) {
}
}
-func (disp *Dispatcher) renice(ctr arvados.Container) {
- nice := disp.niceness(ctr.Priority)
- oldnice := disp.sqCheck.GetNiceness(ctr.UUID)
- if nice == oldnice || oldnice == -1 {
- return
- }
- log.Printf("updating slurm nice value to %d (was %d)", nice, oldnice)
- disp.sqCheck.L.Lock()
- err := disp.slurm.Renice(ctr.UUID, nice)
- disp.sqCheck.L.Unlock()
-
- if err != nil {
- log.Printf("renice: %s", err)
- time.Sleep(time.Second)
- return
- }
- if disp.sqCheck.HasUUID(ctr.UUID) {
- log.Printf("container %s has arvados priority %d, slurm nice %d",
- ctr.UUID, ctr.Priority, disp.sqCheck.GetNiceness(ctr.UUID))
- }
-}
-
func (disp *Dispatcher) readConfig(path string) error {
err := config.LoadFile(disp, path)
if err != nil && os.IsNotExist(err) && path == defaultConfigPath {
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
index 9fb5d66..371f7db 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
@@ -74,7 +74,7 @@ func (sf *slurmFake) QueueCommand(args []string) *exec.Cmd {
return exec.Command("echo", sf.queue)
}
-func (sf *slurmFake) Renice(name string, nice int) error {
+func (sf *slurmFake) Renice(name string, nice int64) error {
sf.didRenice = append(sf.didRenice, []string{name, fmt.Sprintf("%d", nice)})
return nil
}
@@ -151,7 +151,7 @@ func (s *IntegrationSuite) integrationTest(c *C,
}
func (s *IntegrationSuite) TestNormal(c *C) {
- s.slurm = slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"}
+ s.slurm = slurmFake{queue: "zzzzz-dz642-queuedcontainer 10000 100\n"}
container := s.integrationTest(c,
nil,
func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
@@ -163,7 +163,7 @@ func (s *IntegrationSuite) TestNormal(c *C) {
}
func (s *IntegrationSuite) TestCancel(c *C) {
- s.slurm = slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"}
+ s.slurm = slurmFake{queue: "zzzzz-dz642-queuedcontainer 10000 100\n"}
readyToCancel := make(chan bool)
s.slurm.onCancel = func() { <-readyToCancel }
container := s.integrationTest(c,
@@ -190,7 +190,7 @@ func (s *IntegrationSuite) TestMissingFromSqueue(c *C) {
fmt.Sprintf("--mem=%d", 11445),
fmt.Sprintf("--cpus-per-task=%d", 4),
fmt.Sprintf("--tmp=%d", 45777),
- fmt.Sprintf("--nice=%d", 9990)}},
+ fmt.Sprintf("--nice=%d", 10000)}},
func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
dispatcher.UpdateState(container.UUID, dispatch.Running)
time.Sleep(3 * time.Second)
@@ -202,7 +202,7 @@ func (s *IntegrationSuite) TestMissingFromSqueue(c *C) {
func (s *IntegrationSuite) TestSbatchFail(c *C) {
s.slurm = slurmFake{errBatch: errors.New("something terrible happened")}
container := s.integrationTest(c,
- [][]string{{"--job-name=zzzzz-dz642-queuedcontainer", "--mem=11445", "--cpus-per-task=4", "--tmp=45777", "--nice=9990"}},
+ [][]string{{"--job-name=zzzzz-dz642-queuedcontainer", "--mem=11445", "--cpus-per-task=4", "--tmp=45777", "--nice=10000"}},
func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
dispatcher.UpdateState(container.UUID, dispatch.Running)
dispatcher.UpdateState(container.UUID, dispatch.Complete)
@@ -220,24 +220,6 @@ func (s *IntegrationSuite) TestSbatchFail(c *C) {
c.Assert(len(ll.Items), Equals, 1)
}
-func (s *IntegrationSuite) TestChangePriority(c *C) {
- s.slurm = slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"}
- container := s.integrationTest(c, nil,
- func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
- dispatcher.UpdateState(container.UUID, dispatch.Running)
- time.Sleep(time.Second)
- dispatcher.Arv.Update("containers", container.UUID,
- arvadosclient.Dict{
- "container": arvadosclient.Dict{"priority": 600}},
- nil)
- time.Sleep(time.Second)
- dispatcher.UpdateState(container.UUID, dispatch.Complete)
- })
- c.Check(container.State, Equals, arvados.ContainerStateComplete)
- c.Assert(len(s.slurm.didRenice), Not(Equals), 0)
- c.Check(s.slurm.didRenice[len(s.slurm.didRenice)-1], DeepEquals, []string{"zzzzz-dz642-queuedcontainer", "4000"})
-}
-
type StubbedSuite struct {
disp Dispatcher
}
@@ -364,7 +346,7 @@ func (s *StubbedSuite) TestSbatchArgs(c *C) {
s.disp.SbatchArguments = defaults
args, err := s.disp.sbatchArgs(container)
- c.Check(args, DeepEquals, append(defaults, "--job-name=123", "--mem=239", "--cpus-per-task=2", "--tmp=0", "--nice=9990"))
+ c.Check(args, DeepEquals, append(defaults, "--job-name=123", "--mem=239", "--cpus-per-task=2", "--tmp=0", "--nice=10000"))
c.Check(err, IsNil)
}
}
@@ -410,7 +392,7 @@ func (s *StubbedSuite) TestSbatchInstanceTypeConstraint(c *C) {
args, err := s.disp.sbatchArgs(container)
c.Check(err, Equals, trial.err)
if trial.err == nil {
- c.Check(args, DeepEquals, append([]string{"--job-name=123", "--mem=239", "--cpus-per-task=2", "--tmp=0", "--nice=9990"}, trial.sbatchArgs...))
+ c.Check(args, DeepEquals, append([]string{"--job-name=123", "--mem=239", "--cpus-per-task=2", "--tmp=0", "--nice=10000"}, trial.sbatchArgs...))
}
}
}
@@ -425,7 +407,7 @@ func (s *StubbedSuite) TestSbatchPartition(c *C) {
args, err := s.disp.sbatchArgs(container)
c.Check(args, DeepEquals, []string{
- "--job-name=123", "--mem=239", "--cpus-per-task=1", "--tmp=0", "--nice=9990",
+ "--job-name=123", "--mem=239", "--cpus-per-task=1", "--tmp=0", "--nice=10000",
"--partition=blurb,b2",
})
c.Check(err, IsNil)
diff --git a/services/crunch-dispatch-slurm/priority.go b/services/crunch-dispatch-slurm/priority.go
index 1445d2e..8c27043 100644
--- a/services/crunch-dispatch-slurm/priority.go
+++ b/services/crunch-dispatch-slurm/priority.go
@@ -4,14 +4,6 @@
package main
-import "git.curoverse.com/arvados.git/sdk/go/arvados"
-
-type slurmJob struct {
- ctr *arvados.Container
- priority int64 // current slurm priority (incorporates nice value)
- nice int64 // current slurm nice value
-}
-
// wantNice calculates appropriate nice values for a set of SLURM
// jobs. The returned slice will have len(jobs) elements.
//
@@ -21,7 +13,7 @@ type slurmJob struct {
// produces lower nice values, which is useful for old SLURM versions
// with a limited "nice" range and for sites where SLURM is also
// running non-Arvados jobs with low nice values.
-func wantNice(jobs []slurmJob, spread int64) []int64 {
+func wantNice(jobs []*slurmJob, spread int64) []int64 {
if len(jobs) == 0 {
return nil
}
diff --git a/services/crunch-dispatch-slurm/priority_test.go b/services/crunch-dispatch-slurm/priority_test.go
index a2da4d2..bc9c4dc 100644
--- a/services/crunch-dispatch-slurm/priority_test.go
+++ b/services/crunch-dispatch-slurm/priority_test.go
@@ -15,7 +15,7 @@ type PrioritySuite struct{}
func (s *PrioritySuite) TestReniceCorrect(c *C) {
for _, test := range []struct {
spread int64
- in []slurmJob
+ in []*slurmJob
out []int64
}{
{
@@ -25,17 +25,17 @@ func (s *PrioritySuite) TestReniceCorrect(c *C) {
},
{
0,
- []slurmJob{},
+ []*slurmJob{},
nil,
},
{
10,
- []slurmJob{{priority: 4294000111, nice: 10000}},
+ []*slurmJob{{priority: 4294000111, nice: 10000}},
[]int64{0},
},
{
10,
- []slurmJob{
+ []*slurmJob{
{priority: 4294000111, nice: 10000},
{priority: 4294000111, nice: 10000},
{priority: 4294000111, nice: 10000},
@@ -45,7 +45,7 @@ func (s *PrioritySuite) TestReniceCorrect(c *C) {
},
{ // smaller spread than necessary, but correctly ordered => leave nice alone
10,
- []slurmJob{
+ []*slurmJob{
{priority: 4294000113, nice: 0},
{priority: 4294000112, nice: 1},
{priority: 4294000111, nice: 99},
@@ -54,7 +54,7 @@ func (s *PrioritySuite) TestReniceCorrect(c *C) {
},
{ // larger spread than necessary, but less than 10x => leave nice alone
10,
- []slurmJob{
+ []*slurmJob{
{priority: 4294000144, nice: 0},
{priority: 4294000122, nice: 22},
{priority: 4294000111, nice: 33},
@@ -63,7 +63,7 @@ func (s *PrioritySuite) TestReniceCorrect(c *C) {
},
{ // > 10x spread => reduce nice to achieve spread=10
10,
- []slurmJob{
+ []*slurmJob{
{priority: 4000, nice: 0}, // max pri 4000
{priority: 3000, nice: 999}, // max pri 3999
{priority: 2000, nice: 1998}, // max pri 3998
@@ -72,7 +72,7 @@ func (s *PrioritySuite) TestReniceCorrect(c *C) {
},
{ // > 10x spread, but spread=10 is impossible without negative nice
10,
- []slurmJob{
+ []*slurmJob{
{priority: 4000, nice: 0}, // max pri 4000
{priority: 3000, nice: 500}, // max pri 3500
{priority: 2000, nice: 2000}, // max pri 4000
@@ -81,7 +81,7 @@ func (s *PrioritySuite) TestReniceCorrect(c *C) {
},
{ // reorder
10,
- []slurmJob{
+ []*slurmJob{
{priority: 4000, nice: 0}, // max pri 4000
{priority: 5000, nice: 0}, // max pri 5000
{priority: 6000, nice: 0}, // max pri 6000
@@ -90,7 +90,7 @@ func (s *PrioritySuite) TestReniceCorrect(c *C) {
},
{ // zero spread
0,
- []slurmJob{
+ []*slurmJob{
{priority: 4000, nice: 0}, // max pri 4000
{priority: 5000, nice: 0}, // max pri 5000
{priority: 6000, nice: 0}, // max pri 6000
@@ -107,10 +107,12 @@ func (s *PrioritySuite) TestReniceCorrect(c *C) {
}
// After making the adjustments, calling wantNice
// again should return the same recommendations.
- updated := make([]slurmJob, len(test.in))
+ updated := make([]*slurmJob, len(test.in))
for i, in := range test.in {
- updated[i].nice = test.out[i]
- updated[i].priority = in.priority + in.nice - test.out[i]
+ updated[i] = &slurmJob{
+ nice: test.out[i],
+ priority: in.priority + in.nice - test.out[i],
+ }
}
c.Check(wantNice(updated, test.spread), DeepEquals, test.out)
}
@@ -118,9 +120,9 @@ func (s *PrioritySuite) TestReniceCorrect(c *C) {
func (s *PrioritySuite) TestReniceChurn(c *C) {
const spread = 10
- jobs := make([]slurmJob, 1000)
+ jobs := make([]*slurmJob, 1000)
for i := range jobs {
- jobs[i] = slurmJob{priority: 4294000000 - int64(i), nice: 10000}
+ jobs[i] = &slurmJob{priority: 4294000000 - int64(i), nice: 10000}
}
adjustments := 0
queue := jobs
diff --git a/services/crunch-dispatch-slurm/slurm.go b/services/crunch-dispatch-slurm/slurm.go
index bd19377..735e057 100644
--- a/services/crunch-dispatch-slurm/slurm.go
+++ b/services/crunch-dispatch-slurm/slurm.go
@@ -14,7 +14,7 @@ import (
type Slurm interface {
Cancel(name string) error
- Renice(name string, nice int) error
+ Renice(name string, nice int64) error
QueueCommand(args []string) *exec.Cmd
Batch(script io.Reader, args []string) error
}
@@ -54,7 +54,7 @@ func (scli *slurmCLI) QueueCommand(args []string) *exec.Cmd {
return exec.Command("squeue", args...)
}
-func (scli *slurmCLI) Renice(name string, nice int) error {
+func (scli *slurmCLI) Renice(name string, nice int64) error {
return scli.run(nil, "scontrol", []string{"update", "JobName=" + name, fmt.Sprintf("Nice=%d", nice)})
}
diff --git a/services/crunch-dispatch-slurm/squeue.go b/services/crunch-dispatch-slurm/squeue.go
index adb620e..d15489b 100644
--- a/services/crunch-dispatch-slurm/squeue.go
+++ b/services/crunch-dispatch-slurm/squeue.go
@@ -8,24 +8,28 @@ import (
"bytes"
"fmt"
"log"
+ "sort"
"strings"
"sync"
"time"
)
-type jobPriority struct {
- niceness int
- currentPriority int
+type slurmJob struct {
+ uuid string
+ wantPriority int64
+ priority int64 // current slurm priority (incorporates nice value)
+ nice int64 // current slurm nice value
}
// Squeue implements asynchronous polling monitor of the SLURM queue using the
// command 'squeue'.
type SqueueChecker struct {
- Period time.Duration
- Slurm Slurm
- uuids map[string]jobPriority
- startOnce sync.Once
- done chan struct{}
+ Period time.Duration
+ PrioritySpread int64
+ Slurm Slurm
+ queue map[string]*slurmJob
+ startOnce sync.Once
+ done chan struct{}
sync.Cond
}
@@ -40,22 +44,52 @@ func (sqc *SqueueChecker) HasUUID(uuid string) bool {
// block until next squeue broadcast signaling an update.
sqc.Wait()
- _, exists := sqc.uuids[uuid]
+ _, exists := sqc.queue[uuid]
return exists
}
-// GetNiceness returns the niceness of a given uuid, or -1 if it doesn't exist.
-func (sqc *SqueueChecker) GetNiceness(uuid string) int {
- sqc.startOnce.Do(sqc.start)
+// SetPriority sets or updates the desired (Arvados) priority for a
+// container.
+func (sqc *SqueueChecker) SetPriority(uuid string, want int64) {
+ sqc.L.Lock()
+ defer sqc.L.Unlock()
+ if _, ok := sqc.queue[uuid]; !ok {
+ // Wait in case the slurm job was just submitted and
+ // will appear in the next squeue update.
+ sqc.Wait()
+ if _, ok = sqc.queue[uuid]; !ok {
+ return
+ }
+ }
+ sqc.queue[uuid].wantPriority = want
+}
+// adjust slurm job nice values as needed to ensure slurm priority
+// order matches Arvados priority order.
+func (sqc *SqueueChecker) reniceAll() {
sqc.L.Lock()
defer sqc.L.Unlock()
- n, exists := sqc.uuids[uuid]
- if exists {
- return n.niceness
- } else {
- return -1
+ jobs := make([]*slurmJob, 0, len(sqc.queue))
+ for _, j := range sqc.queue {
+ if j.wantPriority == 0 {
+ // SLURM job with unknown Arvados priority
+ // (perhaps it's not an Arvados job)
+ continue
+ }
+ jobs = append(jobs, j)
+ }
+
+ sort.Slice(jobs, func(i, j int) bool {
+ return jobs[i].wantPriority > jobs[j].wantPriority
+ })
+ renice := wantNice(jobs, sqc.PrioritySpread)
+ for i, job := range jobs {
+ if renice[i] == job.nice {
+ continue
+ }
+ log.Printf("updating slurm priority for %q: nice %d => %d", job.uuid, job.nice, renice[i])
+ sqc.Slurm.Renice(job.uuid, renice[i])
}
}
@@ -68,7 +102,7 @@ func (sqc *SqueueChecker) Stop() {
}
// check gets the names of jobs in the SLURM queue (running and
-// queued). If it succeeds, it updates squeue.uuids and wakes up any
+// queued). If it succeeds, it updates sqc.queue and wakes up any
// goroutines that are waiting in HasUUID() or All().
func (sqc *SqueueChecker) check() {
// Mutex between squeue sync and running sbatch or scancel. This
@@ -87,16 +121,24 @@ func (sqc *SqueueChecker) check() {
}
lines := strings.Split(stdout.String(), "\n")
- sqc.uuids = make(map[string]jobPriority, len(lines))
+ newq := make(map[string]*slurmJob, len(lines))
for _, line := range lines {
+ if line == "" {
+ continue
+ }
var uuid string
- var nice int
- var prio int
- fmt.Sscan(line, &uuid, &nice, &prio)
- if uuid != "" {
- sqc.uuids[uuid] = jobPriority{nice, prio}
+ var n, p int64
+ if _, err := fmt.Sscan(line, &uuid, &n, &p); err != nil {
+ log.Printf("warning: ignoring unparsed line in squeue output: %q", line)
+ continue
+ }
+ newq[uuid] = &slurmJob{
+ uuid: uuid,
+ priority: p,
+ nice: n,
}
}
+ sqc.queue = newq
sqc.Broadcast()
}
@@ -114,6 +156,7 @@ func (sqc *SqueueChecker) start() {
return
case <-ticker.C:
sqc.check()
+ sqc.reniceAll()
}
}
}()
@@ -127,8 +170,8 @@ func (sqc *SqueueChecker) All() []string {
defer sqc.L.Unlock()
sqc.Wait()
var uuids []string
- for uuid := range sqc.uuids {
- uuids = append(uuids, uuid)
+ for u := range sqc.queue {
+ uuids = append(uuids, u)
}
return uuids
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list